##HW6 SOLUTION CODE:

Scroll past this (~900 lines) to get to the project implementation. I have left this section untouched.

In [5]:
import os
import re
import random
import urllib.request
import numpy as np
import matplotlib.pyplot as plt
from wordcloud import WordCloud
from matplotlib import animation
from matplotlib.animation import FuncAnimation

class HiddenMarkovModel:
    '''
    Class implementation of Hidden Markov Models.
    '''

    def __init__(self, A, O):
        '''
        Initializes an HMM. Assumes the following:
            - States and observations are integers starting from 0. 
            - There is a start state (see notes on A_start below). There
              is no integer associated with the start state, only
              probabilities in the vector A_start.
            - There is no end state. 
        Arguments:
            A:          Transition matrix with dimensions L x L.
                        The (i, j)^th element is the probability of
                        transitioning from state i to state j. Note that
                        this does not include the starting probabilities.
            O:          Observation matrix with dimensions L x D.
                        The (i, j)^th element is the probability of
                        emitting observation j given state i.
        Parameters:
            L:          Number of states.
            D:          Number of observations.
            
            A:          The transition matrix.
            
            O:          The observation matrix.
            
            A_start:    Starting transition probabilities. The i^th element
                        is the probability of transitioning from the start
                        state to state i. For simplicity, we assume that
                        this distribution is uniform.
        '''

        self.L = len(A)
        self.D = len(O[0])
        self.A = A
        self.O = O
        self.A_start = [1. / self.L for _ in range(self.L)]


    def viterbi(self, x):
        '''
        Uses the Viterbi algorithm to find the max probability state 
        sequence corresponding to a given input sequence.
        Arguments:
            x:          Input sequence in the form of a list of length M,
                        consisting of integers ranging from 0 to D - 1.
        Returns:
            max_seq:    Output sequence corresponding to x with the highest
                        probability.
        '''

        M = len(x)      # Length of sequence.

        # The (i, j)^th elements of probs and seqs are the max probability
        # of the prefix of length i ending in state j and the prefix
        # that gives this probability, respectively.
        #
        # For instance, probs[1][0] is the probability of the prefix of
        # length 1 ending in state 0.
        probs = [[0. for _ in range(self.L)] for _ in range(M + 1)]
        seqs = [['' for _ in range(self.L)] for _ in range(M + 1)]

        # Calculate initial prefixes and probabilities.
        for curr in range(self.L):
            probs[1][curr] = self.A_start[curr] * self.O[curr][x[0]]
            seqs[1][curr] = str(curr)

        # Calculate best prefixes and probabilities throughout sequence.
        for t in range(2, M + 1):
            # Iterate over all possible current states.
            for curr in range(self.L):
                max_prob = float("-inf")
                max_prefix = ''

                # Iterate over all possible previous states to find one
                # that would maximize the probability of the current state.
                for prev in range(self.L):
                    curr_prob = probs[t - 1][prev] \
                                * self.A[prev][curr] \
                                * self.O[curr][x[t - 1]]

                    # Continually update max probability and prefix.
                    if curr_prob >= max_prob:
                        max_prob = curr_prob
                        max_prefix = seqs[t - 1][prev]

                # Store the max probability and prefix.
                probs[t][curr] = max_prob
                seqs[t][curr] = max_prefix + str(curr)

        # Find the index of the max probability of a sequence ending in x^M
        # and the corresponding output sequence.
        max_i = max(enumerate(probs[-1]), key=lambda x: x[1])[0]
        max_seq = seqs[-1][max_i]

        return max_seq


    def forward(self, x, normalize=False):
        '''
        Uses the forward algorithm to calculate the alpha probability
        vectors corresponding to a given input sequence.
        Arguments:
            x:          Input sequence in the form of a list of length M,
                        consisting of integers ranging from 0 to D - 1.
            normalize:  Whether to normalize each set of alpha_j(i) vectors
                        at each i. This is useful to avoid underflow in
                        unsupervised learning.
        Returns:
            alphas:     Vector of alphas.
                        The (i, j)^th element of alphas is alpha_j(i),
                        i.e. the probability of observing prefix x^1:i
                        and state y^i = j.
                        e.g. alphas[1][0] corresponds to the probability
                        of observing x^1:1, i.e. the first observation,
                        given that y^1 = 0, i.e. the first state is 0.
        '''

        M = len(x)      # Length of sequence.
        alphas = [[0. for _ in range(self.L)] for _ in range(M + 1)]

        # Note that alpha_j(0) is already correct for all j's.
        # Calculate alpha_j(1) for all j's.
        for curr in range(self.L):
            alphas[1][curr] = self.A_start[curr] * self.O[curr][x[0]]

        # Calculate alphas throughout sequence.
        for t in range(1, M):
            # Iterate over all possible current states.
            for curr in range(self.L):
                prob = 0

                # Iterate over all possible previous states to accumulate
                # the probabilities of all paths from the start state to
                # the current state.
                for prev in range(self.L):
                    prob += alphas[t][prev] \
                            * self.A[prev][curr] \
                            * self.O[curr][x[t]]

                # Store the accumulated probability.
                alphas[t + 1][curr] = prob

            if normalize:
                norm = sum(alphas[t + 1])
                for curr in range(self.L):
                    alphas[t + 1][curr] /= norm

        return alphas


    def backward(self, x, normalize=False):
        '''
        Uses the backward algorithm to calculate the beta probability
        vectors corresponding to a given input sequence.
        Arguments:
            x:          Input sequence in the form of a list of length M,
                        consisting of integers ranging from 0 to D - 1.
            normalize:  Whether to normalize each set of alpha_j(i) vectors
                        at each i. This is useful to avoid underflow in
                        unsupervised learning.
        Returns:
            betas:      Vector of betas.
                        The (i, j)^th element of betas is beta_j(i), i.e.
                        the probability of observing prefix x^(i+1):M and
                        state y^i = j.
                        e.g. betas[M][0] corresponds to the probability
                        of observing x^M+1:M, i.e. no observations,
                        given that y^M = 0, i.e. the last state is 0.
        '''

        M = len(x)      # Length of sequence.
        betas = [[0. for _ in range(self.L)] for _ in range(M + 1)]

        # Initialize initial betas.
        for curr in range(self.L):
            betas[-1][curr] = 1

        # Calculate betas throughout sequence.
        for t in range(-1, -M - 1, -1):
            # Iterate over all possible current states.
            for curr in range(self.L):
                prob = 0

                # Iterate over all possible next states to accumulate
                # the probabilities of all paths from the end state to
                # the current state.
                for nxt in range(self.L):
                    if t == -M:
                        prob += betas[t][nxt] \
                                * self.A_start[nxt] \
                                * self.O[nxt][x[t]]

                    else:
                        prob += betas[t][nxt] \
                                * self.A[curr][nxt] \
                                * self.O[nxt][x[t]]

                # Store the accumulated probability.
                betas[t - 1][curr] = prob

            if normalize:
                norm = sum(betas[t - 1])
                for curr in range(self.L):
                    betas[t - 1][curr] /= norm

        return betas


    def supervised_learning(self, X, Y):
        '''
        Trains the HMM using the Maximum Likelihood closed form solutions
        for the transition and observation matrices on a labeled
        datset (X, Y). Note that this method does not return anything, but
        instead updates the attributes of the HMM object.
        Arguments:
            X:          A dataset consisting of input sequences in the form
                        of lists of variable length, consisting of integers 
                        ranging from 0 to D - 1. In other words, a list of
                        lists.
            Y:          A dataset consisting of state sequences in the form
                        of lists of variable length, consisting of integers 
                        ranging from 0 to L - 1. In other words, a list of
                        lists.
                        Note that the elements in X line up with those in Y.
        '''

        # Calculate each element of A using the M-step formulas.
        for curr in range(self.L):
            for nxt in range(self.L):
                num = 0.
                den = 0.

                for i in range(len(X)):
                    x = X[i]
                    y = Y[i]
                    M = len(x)
        
                    num += len([1 for i in range(M - 1) \
                                if y[i] == curr and y[i + 1] == nxt])
                    den += len([1 for i in range(M - 1) if y[i] == curr])

                self.A[curr][nxt] = num / den

        # Calculate each element of O using the M-step formulas.
        for curr in range(self.L):
            for xt in range(self.D):
                num = 0.
                den = 0.

                for i in range(len(X)):
                    x = X[i]
                    y = Y[i]
                    M = len(x)
        
                    num += len([1 for i in range(M) \
                                if y[i] == curr and x[i] == xt])
                    den += len([1 for i in range(M) if y[i] == curr])

                self.O[curr][xt] = num / den


    def unsupervised_learning(self, X, N_iters):
        '''
        Trains the HMM using the Baum-Welch algorithm on an unlabeled
        datset X. Note that this method does not return anything, but
        instead updates the attributes of the HMM object.
        Arguments:
            X:          A dataset consisting of input sequences in the form
                        of lists of length M, consisting of integers ranging
                        from 0 to D - 1. In other words, a list of lists.
            N_iters:    The number of iterations to train on.
        '''

        # Note that a comment starting with 'E' refers to the fact that
        # the code under the comment is part of the E-step.

        # Similarly, a comment starting with 'M' refers to the fact that
        # the code under the comment is part of the M-step.

        for iteration in range(1, N_iters + 1):
            if iteration % 10 == 0:
                print("Iteration: " + str(iteration))

            # Numerator and denominator for the update terms of A and O.
            A_num = [[0. for i in range(self.L)] for j in range(self.L)]
            O_num = [[0. for i in range(self.D)] for j in range(self.L)]
            A_den = [0. for i in range(self.L)]
            O_den = [0. for i in range(self.L)]

            # For each input sequence:
            for x in X:
                M = len(x)
                # Compute the alpha and beta probability vectors.
                alphas = self.forward(x, normalize=True)
                betas = self.backward(x, normalize=True)

                # E: Update the expected observation probabilities for a
                # given (x, y).
                # The i^th index is P(y^t = i, x).
                for t in range(1, M + 1):
                    P_curr = [0. for _ in range(self.L)]
                    
                    for curr in range(self.L):
                        P_curr[curr] = alphas[t][curr] * betas[t][curr]

                    # Normalize the probabilities.
                    norm = sum(P_curr)
                    for curr in range(len(P_curr)):
                        P_curr[curr] /= norm

                    for curr in range(self.L):
                        if t != M:
                            A_den[curr] += P_curr[curr]
                        O_den[curr] += P_curr[curr]
                        O_num[curr][x[t - 1]] += P_curr[curr]

                # E: Update the expectedP(y^j = a, y^j+1 = b, x) for given (x, y)
                for t in range(1, M):
                    P_curr_nxt = [[0. for _ in range(self.L)] for _ in range(self.L)]

                    for curr in range(self.L):
                        for nxt in range(self.L):
                            P_curr_nxt[curr][nxt] = alphas[t][curr] \
                                                    * self.A[curr][nxt] \
                                                    * self.O[nxt][x[t]] \
                                                    * betas[t + 1][nxt]

                    # Normalize:
                    norm = 0
                    for lst in P_curr_nxt:
                        norm += sum(lst)
                    for curr in range(self.L):
                        for nxt in range(self.L):
                            P_curr_nxt[curr][nxt] /= norm

                    # Update A_num
                    for curr in range(self.L):
                        for nxt in range(self.L):
                            A_num[curr][nxt] += P_curr_nxt[curr][nxt]

            for curr in range(self.L):
                for nxt in range(self.L):
                    self.A[curr][nxt] = A_num[curr][nxt] / A_den[curr]

            for curr in range(self.L):
                for xt in range(self.D):
                    self.O[curr][xt] = O_num[curr][xt] / O_den[curr]

    def generate_emission(self, M):
        '''
        Generates an emission of length M, assuming that the starting state
        is chosen uniformly at random. 
        Arguments:
            M:          Length of the emission to generate.
        Returns:
            emission:   The randomly generated emission as a list.
            states:     The randomly generated states as a list.
        '''

        emission = []
        state = random.choice(range(self.L))
        states = []

        for t in range(M):
            # Append state.
            states.append(state)

            # Sample next observation.
            rand_var = random.uniform(0, 1)
            next_obs = 0

            while rand_var > 0:
                rand_var -= self.O[state][next_obs]
                next_obs += 1

            next_obs -= 1
            emission.append(next_obs)

            # Sample next state.
            rand_var = random.uniform(0, 1)
            next_state = 0

            while rand_var > 0:
                rand_var -= self.A[state][next_state]
                next_state += 1

            next_state -= 1
            state = next_state

        return emission, states


    def probability_alphas(self, x):
        '''
        Finds the maximum probability of a given input sequence using
        the forward algorithm.
        Arguments:
            x:          Input sequence in the form of a list of length M,
                        consisting of integers ranging from 0 to D - 1.
        Returns:
            prob:       Total probability that x can occur.
        '''

        # Calculate alpha vectors.
        alphas = self.forward(x)

        # alpha_j(M) gives the probability that the output sequence ends
        # in j. Summing this value over all possible states j gives the
        # total probability of x paired with any output sequence, i.e. the
        # probability of x.
        prob = sum(alphas[-1])
        return prob


    def probability_betas(self, x):
        '''
        Finds the maximum probability of a given input sequence using
        the backward algorithm.
        Arguments:
            x:          Input sequence in the form of a list of length M,
                        consisting of integers ranging from 0 to D - 1.
        Returns:
            prob:       Total probability that x can occur.
        '''

        betas = self.backward(x)

        # beta_j(0) gives the probability of the output sequence. Summing
        # this over all states and then normalizing gives the total
        # probability of x paired with any output sequence, i.e. the
        # probability of x.
        prob = sum([betas[1][k] * self.A_start[k] * self.O[k][x[0]] \
            for k in range(self.L)])

        return prob


def supervised_HMM(X, Y):
    '''
    Helper function to train a supervised HMM. The function determines the
    number of unique states and observations in the given data, initializes
    the transition and observation matrices, creates the HMM, and then runs
    the training function for supervised learning.
    Arguments:
        X:          A dataset consisting of input sequences in the form
                    of lists of variable length, consisting of integers 
                    ranging from 0 to D - 1. In other words, a list of lists.
        Y:          A dataset consisting of state sequences in the form
                    of lists of variable length, consisting of integers 
                    ranging from 0 to L - 1. In other words, a list of lists.
                    Note that the elements in X line up with those in Y.
    '''

    # Make a set of observations.
    observations = set()
    for x in X:
        observations |= set(x)

    # Make a set of states.
    states = set()
    for y in Y:
        states |= set(y)
    
    # Compute L and D.
    L = len(states)
    D = len(observations)

    # Randomly initialize and normalize matrices A and O.
    A = [[random.random() for i in range(L)] for j in range(L)]

    for i in range(len(A)):
        norm = sum(A[i])
        for j in range(len(A[i])):
            A[i][j] /= norm
    
    O = [[random.random() for i in range(D)] for j in range(L)]

    for i in range(len(O)):
        norm = sum(O[i])
        for j in range(len(O[i])):
            O[i][j] /= norm

    # Train an HMM with labeled data.
    HMM = HiddenMarkovModel(A, O)
    HMM.supervised_learning(X, Y)

    return HMM


def unsupervised_HMM(X, n_states, N_iters,rng=np.random.RandomState(1)):
    '''
    Helper function to train an unsupervised HMM. The function determines the
    number of unique observations in the given data, initializes
    the transition and observation matrices, creates the HMM, and then runs
    the training function for unsupervised learing.
    Arguments:
        X:          A dataset consisting of input sequences in the form
                    of lists of variable length, consisting of integers 
                    ranging from 0 to D - 1. In other words, a list of lists.
        n_states:   Number of hidden states to use in training.
        
        N_iters:    The number of iterations to train on.
        rng:        The random number generator for reproducible result.
                    Default to RandomState(1).
    '''

    # Make a set of observations.
    observations = set()
    for x in X:
        observations |= set(x)
    
    # Compute L and D.
    L = n_states
    D = len(observations)

    # Randomly initialize and normalize matrices A.
    A = [[rng.random() for i in range(L)] for j in range(L)]

    for i in range(len(A)):
        norm = sum(A[i])
        for j in range(len(A[i])):
            A[i][j] /= norm
    
    # Randomly initialize and normalize matrix O.
    O = [[rng.random() for i in range(D)] for j in range(L)]

    for i in range(len(O)):
        norm = sum(O[i])
        for j in range(len(O[i])):
            O[i][j] /= norm

    # Train an HMM with unlabeled data.
    HMM = HiddenMarkovModel(A, O)
    HMM.unsupervised_learning(X, N_iters)

    return HMM

########################################
# CS/CNS/EE 155 2018
# Problem Set 6
#
# Author:       Andrew Kang
# Description:  Set 6 HMM helper
########################################

import re
import numpy as np
import matplotlib.pyplot as plt
from wordcloud import WordCloud
from matplotlib import animation
from matplotlib.animation import FuncAnimation


####################
# WORDCLOUD FUNCTIONS
####################

def mask():
    # Parameters.
    r = 128
    d = 2 * r + 1

    # Get points in a circle.
    y, x = np.ogrid[-r:d-r, -r:d-r]
    circle = (x**2 + y**2 <= r**2)

    # Create mask.
    mask = 255 * np.ones((d, d), dtype=np.uint8)
    mask[circle] = 0

    return mask

def text_to_wordcloud(text, max_words=50, title='', show=True):
    plt.close('all')

    # Generate a wordcloud image.
    wordcloud = WordCloud(random_state=0,
                          max_words=max_words,
                          background_color='white',
                          mask=mask()).generate(text)

    # Show the image.
    if show:
        plt.imshow(wordcloud, interpolation='bilinear')
        plt.axis('off')
        plt.title(title, fontsize=24)
        plt.show()

    return wordcloud

def states_to_wordclouds(hmm, obs_map, max_words=50, show=True):
    # Initialize.
    M = 100000
    n_states = len(hmm.A)
    obs_map_r = obs_map_reverser(obs_map)
    wordclouds = []

    # Generate a large emission.
    emission, states = hmm.generate_emission(M)

    # For each state, get a list of observations that have been emitted
    # from that state.
    obs_count = []
    for i in range(n_states):
        obs_lst = np.array(emission)[np.where(np.array(states) == i)[0]]
        obs_count.append(obs_lst)

    # For each state, convert it into a wordcloud.
    for i in range(n_states):
        obs_lst = obs_count[i]
        sentence = [obs_map_r[j] for j in obs_lst]
        sentence_str = ' '.join(sentence)

        wordclouds.append(text_to_wordcloud(sentence_str, max_words=max_words, title='State %d' % i, show=show))

    return wordclouds


####################
# HMM FUNCTIONS
####################

def parse_observations(text):
    # Convert text to dataset.
    lines = [line.split() for line in text.split('\n') if line.split()]

    obs_counter = 0
    obs = []
    obs_map = {}

    for line in lines:
        obs_elem = []
        
        for word in line:
            word = re.sub(r'[^\w]', '', word).lower()
            if word not in obs_map:
                # Add unique words to the observations map.
                obs_map[word] = obs_counter
                obs_counter += 1
            
            # Add the encoded word.
            obs_elem.append(obs_map[word])
        
        # Add the encoded sequence.
        obs.append(obs_elem)

    return obs, obs_map

def obs_map_reverser(obs_map):
    obs_map_r = {}

    for key in obs_map:
        obs_map_r[obs_map[key]] = key

    return obs_map_r

def sample_sentence(hmm, obs_map, n_words=100):
    # Get reverse map.
    obs_map_r = obs_map_reverser(obs_map)

    # Sample and convert sentence.
    emission, states = hmm.generate_emission(n_words)
    sentence = [obs_map_r[i] for i in emission]

    return ' '.join(sentence).capitalize() + '...'


####################
# HMM VISUALIZATION FUNCTIONS
####################

def visualize_sparsities(hmm, O_max_cols=50, O_vmax=0.1):
    plt.close('all')
    plt.set_cmap('viridis')

    # Visualize sparsity of A.
    plt.imshow(hmm.A, vmax=1.0)
    plt.colorbar()
    plt.title('Sparsity of A matrix')
    plt.show()

    # Visualize parsity of O.
    plt.imshow(np.array(hmm.O)[:, :O_max_cols], vmax=O_vmax, aspect='auto')
    plt.colorbar()
    plt.title('Sparsity of O matrix')
    plt.show()


####################
# HMM ANIMATION FUNCTIONS
####################

def animate_emission(hmm, obs_map, M=8, height=12, width=12, delay=1):
    # Parameters.
    lim = 1200
    text_x_offset = 40
    text_y_offset = 80
    x_offset = 580
    y_offset = 520
    R = 420
    r = 100
    arrow_size = 20
    arrow_p1 = 0.03
    arrow_p2 = 0.02
    arrow_p3 = 0.06
    
    # Initialize.
    n_states = len(hmm.A)
    obs_map_r = obs_map_reverser(obs_map)
    wordclouds = states_to_wordclouds(hmm, obs_map, max_words=20, show=False)

    # Initialize plot.    
    fig, ax = plt.subplots()
    fig.set_figheight(height)
    fig.set_figwidth(width)
    ax.grid('off')
    plt.axis('off')
    ax.set_xlim([0, lim])
    ax.set_ylim([0, lim])

    # Plot each wordcloud.
    for i, wordcloud in enumerate(wordclouds):
        x = x_offset + int(R * np.cos(np.pi * 2 * i / n_states))
        y = y_offset + int(R * np.sin(np.pi * 2 * i / n_states))
        ax.imshow(wordcloud.to_array(), extent=(x - r, x + r, y - r, y + r), aspect='auto', zorder=-1)

    # Initialize text.
    text = ax.text(text_x_offset, lim - text_y_offset, '', fontsize=24)
        
    # Make the arrows.
    zorder_mult = n_states ** 2 * 100
    arrows = []
    for i in range(n_states):
        row = []
        for j in range(n_states):
            # Arrow coordinates.
            x_i = x_offset + R * np.cos(np.pi * 2 * i / n_states)
            y_i = y_offset + R * np.sin(np.pi * 2 * i / n_states)
            x_j = x_offset + R * np.cos(np.pi * 2 * j / n_states)
            y_j = y_offset + R * np.sin(np.pi * 2 * j / n_states)
            
            dx = x_j - x_i
            dy = y_j - y_i
            d = np.sqrt(dx**2 + dy**2)

            if i != j:
                arrow = ax.arrow(x_i + (r/d + arrow_p1) * dx + arrow_p2 * dy,
                                 y_i + (r/d + arrow_p1) * dy + arrow_p2 * dx,
                                 (1 - 2 * r/d - arrow_p3) * dx,
                                 (1 - 2 * r/d - arrow_p3) * dy,
                                 color=(1 - hmm.A[i][j], ) * 3,
                                 head_width=arrow_size, head_length=arrow_size,
                                 zorder=int(hmm.A[i][j] * zorder_mult))
            else:
                arrow = ax.arrow(x_i, y_i, 0, 0,
                                 color=(1 - hmm.A[i][j], ) * 3,
                                 head_width=arrow_size, head_length=arrow_size,
                                 zorder=int(hmm.A[i][j] * zorder_mult))

            row.append(arrow)
        arrows.append(row)

    emission, states = hmm.generate_emission(M)

    def animate(i):
        if i >= delay:
            i -= delay

            if i == 0:
                arrows[states[0]][states[0]].set_color('red')
            elif i == 1:
                arrows[states[0]][states[0]].set_color((1 - hmm.A[states[0]][states[0]], ) * 3)
                arrows[states[i - 1]][states[i]].set_color('red')
            else:
                arrows[states[i - 2]][states[i - 1]].set_color((1 - hmm.A[states[i - 2]][states[i - 1]], ) * 3)
                arrows[states[i - 1]][states[i]].set_color('red')

            # Set text.
            text.set_text(' '.join([obs_map_r[e] for e in emission][:i+1]).capitalize())

            return arrows + [text]

    # Animate!
    print('\nAnimating...')
    anim = FuncAnimation(fig, animate, frames=M+delay, interval=1000)

    return anim

    # honestly this function is so jank but who even fuckin cares
    # i don't even remember how or why i wrote this mess
    # no one's gonna read this
    # hey if you see this tho hmu on fb let's be friends

class Utility:
    '''
    Utility for the problem files.
    '''

    def __init__():
        pass

    @staticmethod
    def load_sequence(n):
        '''
        Load the file 'sequence_data<n>.txt' for a given n.
        Arguments:
            n:          Sequence index.
        Returns:
            A:          The transition matrix.
            O:          The observation matrix.
            seqs:       Input sequences.
        '''
        A = []
        O = []
        seqs = []

        # For each file:
        with urllib.request.urlopen(f'https://raw.githubusercontent.com/lakigigar/Caltech-CS155-2021/main/psets/set6/data/sequence_data{n}.txt') as f:
            # Read the parameters.
            L, D = [int(x) for x in f.readline().decode('utf-8').strip().split('\t')]

            # Read the transition matrix.
            for i in range(L):
                A.append([float(x) for x in f.readline().decode('utf-8').strip().split('\t')])

            # Read the observation matrix.
            for i in range(L):
                O.append([float(x) for x in f.readline().decode('utf-8').strip().split('\t')])

            # The rest of the file consists of sequences.
            while True:
                seq = f.readline().decode('utf-8').strip()
                if seq == '':
                    break
                seqs.append([int(x) for x in seq])

        return A, O, seqs

    @staticmethod
    def load_ron():
        '''
        Loads the file 'ron.txt'.
        Returns:
            moods:      Sequnces of states, i.e. a list of lists.
                        Each sequence represents half a year of data.
            mood_map:   A hash map that maps each state to an integer.
            genres:     Sequences of observations, i.e. a list of lists.
                        Each sequence represents half a year of data.
            genre_map:  A hash map that maps each observation to an integer.
        '''
        moods = []
        mood_map = {}
        genres = []
        genre_map = {}
        mood_counter = 0
        genre_counter = 0

        with urllib.request.urlopen("https://raw.githubusercontent.com/lakigigar/Caltech-CS155-2021/main/psets/set6/data/ron.txt") as f:
            mood_seq = []
            genre_seq = []

            while True:
                line = f.readline().decode('utf-8').strip()

                if line == '' or line == '-':
                    # A half year has passed. Add the current sequence to
                    # the list of sequences.
                    moods.append(mood_seq)
                    genres.append(genre_seq)
                    # Start new sequences.
                    mood_seq = []
                    genre_seq = []
                
                if line == '':
                    break
                elif line == '-':
                    continue
                
                mood, genre = line.split()
                
                # Add new moods to the mood state hash map.
                if mood not in mood_map:
                    mood_map[mood] = mood_counter
                    mood_counter += 1

                mood_seq.append(mood_map[mood])

                # Add new genres to the genre observation hash map.
                if genre not in genre_map:
                    genre_map[genre] = genre_counter
                    genre_counter += 1

                # Convert the genre into an integer.
                genre_seq.append(genre_map[genre])

        return moods, mood_map, genres, genre_map

    @staticmethod
    def load_ron_hidden():
        '''
        Loads the file 'ron.txt' and hides the states.
        Returns:
            genres:     The observations.
            genre_map:  A hash map that maps each observation to an integer.
        '''
        moods, mood_map, genres, genre_map = Utility.load_ron()

        return genres, genre_map

# DATA PRE-PROCESSING

Read in the data to generate state data, syllable data, and rhyming data:

In [356]:
import pandas as pd
#Each word is an observation state (regardless of syllables)
#Keep track of word's syllables, so we can generate right number of syllables

worddata = pd.read_csv("https://github.com/lakigigar/Caltech-CS155-2021/raw/main/projects/project3/data/Syllable_dictionary.txt", header = None)
nwords = len(worddata)
words = []
syl = np.zeros((nwords,2))
end = np.zeros((nwords,2))

for i in range(nwords):
  datastring = worddata[0].iloc[i].split(" ")
  words.append(datastring[0])
  count = 0
  for j in range(1,len(datastring)):
    if datastring[j][0]=="E":
      end[i,count] = 1
    syl[i,count] = int(datastring[j].split("E")[-1])
    count = count+1

worddata["WORDS"] = words
worddata["SYL1"] = syl[:,0]
worddata["SYL2"] = syl[:,1]
worddata["END1"] = end[:,0]
worddata["END2"] = end[:,1]
worddata = worddata.drop(columns=0)
worddata["STATEID"] = worddata.index
display(worddata)

#naive rhyming dictionary (match last 4 characters)
rhymedict = {}
for word in words:
  wordrhymes = []
  for word2 in words:
    if word[-4:]==word2[-4:] and word!=word2:
      wordrhymes.append(word2)
  rhymedict[word]=wordrhymes

#more-correct rhyming dictionary (use the CMU database)
!pip install pronouncing

import pronouncing

rhymedict2 = {}
for word in words:
  wordrhymes = []
  r = pronouncing.rhymes(word)
  for word2 in words:
    if word2 in r:
      wordrhymes.append(word2)
  rhymedict2[word] = wordrhymes

Unnamed: 0,WORDS,SYL1,SYL2,END1,END2,STATEID
0,'gainst,1.0,0.0,0.0,0.0,0
1,'greeing,1.0,2.0,1.0,0.0,1
2,'scaped,1.0,0.0,0.0,0.0,2
3,'tis,1.0,0.0,0.0,0.0,3
4,'twixt,1.0,0.0,0.0,0.0,4
...,...,...,...,...,...,...
3200,yours,1.0,0.0,0.0,0.0,3200
3201,youth,1.0,0.0,0.0,0.0,3201
3202,youth's,1.0,0.0,0.0,0.0,3202
3203,youthful,2.0,0.0,0.0,0.0,3203


Collecting pronouncing
  Downloading https://files.pythonhosted.org/packages/7f/c6/9dc74a3ddca71c492e224116b6654592bfe5717b4a78582e4d9c3345d153/pronouncing-0.2.0.tar.gz
Collecting cmudict>=0.4.0
[?25l  Downloading https://files.pythonhosted.org/packages/a2/77/f009abf803286876fa99cb7bd9d1132c7b64a0b34a0360666275ce1bc733/cmudict-0.4.5-py2.py3-none-any.whl (939kB)
[K     |████████████████████████████████| 942kB 6.9MB/s 
[?25hBuilding wheels for collected packages: pronouncing
  Building wheel for pronouncing (setup.py) ... [?25l[?25hdone
  Created wheel for pronouncing: filename=pronouncing-0.2.0-py2.py3-none-any.whl size=6223 sha256=3595d389afbc0e0e21e9b115f771053b517dabbe2c6b22ea9e24c41cfd188b94
  Stored in directory: /root/.cache/pip/wheels/81/fd/e8/fb1a226f707c7e20dbed4c43f81b819d279ffd3b0e2f06ee13
Successfully built pronouncing
Installing collected packages: cmudict, pronouncing
Successfully installed cmudict-0.4.5 pronouncing-0.2.0


Generate training data from Shakespeare's works given the processed word data:

In [357]:
from urllib import request
#Read in Shakespeare text and set up training data:
spurl = "https://github.com/lakigigar/Caltech-CS155-2021/raw/main/projects/project3/data/shakespeare.txt"


#Each sequence is a single LINE of a shakespeare poem, so we predict one line at a time:
spfile = request.urlopen(spurl)
exceptions = ["t'","th'","'gainst","'greeing","'scaped","'tis","'twixt"]
statelines = []
for line in spfile:
  stateline = []
  line = line.decode('utf-8')
  line = line.strip('!.:?,-();')
  line = line.strip()
  line = line.lower()
  if line.isnumeric() or not line:
    continue
  line = line.split(" ")
  for word in line:
      word = "".join(c for c in word if c not in ('!','.',':', '?', ',', '(', ')', ';'))
      if(word[0]=="'" and word not in exceptions):
        word = word[1:]
      if(word[-1]=="'" and word not in exceptions):
        word = word[:-1]
      wordid = words.index(word)
      stateline.append(wordid)
  statelines.append(stateline)

#TRAIN THE HMM

Train the HMM on the above setup:

In [361]:
nstates = 9
nobs = nwords
N_iters = 800

rng = np.random.RandomState(1)

# Randomly initialize and normalize matrices A.
A = [[rng.random() for i in range(nstates)] for j in range(nstates)]

for i in range(len(A)):
    norm = sum(A[i])
    for j in range(len(A[i])):
        A[i][j] /= norm
    
# Randomly initialize and normalize matrix O.
O = [[rng.random() for i in range(nobs)] for j in range(nstates)]

for i in range(len(O)):
    norm = sum(O[i])
    for j in range(len(O[i])):
        O[i][j] /= norm

#Train the model:
HMM = HiddenMarkovModel(A, O)
HMM.unsupervised_learning(statelines, N_iters)

Iteration: 10
Iteration: 20
Iteration: 30
Iteration: 40
Iteration: 50
Iteration: 60
Iteration: 70
Iteration: 80
Iteration: 90
Iteration: 100
Iteration: 110
Iteration: 120
Iteration: 130
Iteration: 140
Iteration: 150
Iteration: 160
Iteration: 170
Iteration: 180
Iteration: 190
Iteration: 200
Iteration: 210
Iteration: 220
Iteration: 230
Iteration: 240
Iteration: 250
Iteration: 260
Iteration: 270
Iteration: 280
Iteration: 290
Iteration: 300
Iteration: 310
Iteration: 320
Iteration: 330
Iteration: 340
Iteration: 350
Iteration: 360
Iteration: 370
Iteration: 380
Iteration: 390
Iteration: 400
Iteration: 410
Iteration: 420
Iteration: 430
Iteration: 440
Iteration: 450
Iteration: 460
Iteration: 470
Iteration: 480
Iteration: 490
Iteration: 500
Iteration: 510
Iteration: 520
Iteration: 530
Iteration: 540
Iteration: 550
Iteration: 560
Iteration: 570
Iteration: 580
Iteration: 590
Iteration: 600
Iteration: 610
Iteration: 620
Iteration: 630
Iteration: 640
Iteration: 650
Iteration: 660
Iteration: 670
Iter

Optional save if your training took a long time:

In [362]:
spA = np.array(HMM.A)
spO = np.array(HMM.O)
np.savetxt("A_800iters_9states.txt",sp1000A)
np.savetxt("O_800iters_9states.txt",sp1000O)

Optional read to skip the training:

In [None]:
A_filename = "A_800iters_9states.txt"
O_filename = "O_800iters_9states.txt"
spA = np.loadtxt(A_filename)
spO = np.loadtxt(O_filename)

#SETUP PETRY-GENERATION CODE

below code for generating different poem types:

In [363]:
'''
********************************************************************************
NOTICE:

All these methods do not incorporate any punctuation, and predict by-line
according to necessary syllable counts.

This also always uses the maximum-possible syllable count to measure syllables,
but you'll notice the worddata dataframe I made encodes additional info (end-data,
different syllable uses, etc) in case you want to try and add additional utility.
********************************************************************************
'''

def getSyllables(emit,worddata,idx):
  if idx:
    return max(worddata["SYL1"].values[emit],worddata["SYL2"].values[emit])
  widx = worddata["WORDS"].tolist().index(emit)
  return getSyllables(widx,worddata,True)

def makeline(A,O,worddata,syllables,start=None,backwards=False,capitalize=False,concat=False):
  words = worddata["WORDS"].tolist()
  L = len(A[0])
  D = len(O[0])
  generate = True
  while generate:
    sylcount = 0
    state = np.random.randint(L)
    emission = []
    if start is not None:
      emission = [start]
      sylcount = getSyllables(start,worddata,False)
    while sylcount<syllables:
      emitweights = O[state,:]
      emit = np.random.choice(np.arange(D),p=emitweights)
      sylcount = sylcount+getSyllables(emit,worddata,idx=True)
      emission.append(words[emit])
      stateweights = A[state,:]
      if backwards:
        stateweights = A[:,state]/sum(A[:,state])
      state = np.random.choice(np.arange(L),p=stateweights)
    count = sum([getSyllables(x,worddata,idx=False) for x in emission])
    if count == syllables:
      generate = False
  if backwards:
    emission.reverse()
  if capitalize:
    emission[0] = emission[0].capitalize()
  line = emission
  if concat:
    line = " ".join(emission)
  return line

def makerhyme(A,O,worddata,rhymedict,syllables,capitalize=False):
  lines = []
  generate = True
  l1 = None
  while generate:
    l1 = makeline(A,O,worddata,syllables,capitalize=capitalize)
    if rhymedict[l1[-1]]:
      generate = False
  rhymeword = np.random.choice(rhymedict[l1[-1]])
  l2 = makeline(A,O,worddata,syllables,start=rhymeword,backwards=True,capitalize=capitalize)
  lines.append(" ".join(l1))
  lines.append(" ".join(l2))
  return lines

def makeSonnet(A, O, worddatta, rhymedict):
  couplets = []
  sonnet = ""
  for i in range(3):
    r1 = makerhyme(A,O,worddata,rhymedict,syllables = 10, capitalize = True)
    r2 = makerhyme(A,O,worddata,rhymedict,syllables = 10, capitalize = True)
    sonnet = sonnet + r1[0] + "\n"
    sonnet = sonnet + r2[0] + "\n"
    sonnet = sonnet + r1[1] + "\n"
    sonnet = sonnet + r2[1] + "\n\n"
  c = makerhyme(A,O,worddata,rhymedict, syllables = 10, capitalize=True)
  sonnet = sonnet + "\t"+c[0] +"\n\t"+c[1]
  return sonnet

def makeHaiku(A,O,worddata):
  haiku = makeline(A,O,worddata,syllables=5, concat=True)+"\n"
  haiku = haiku+makeline(A,O,worddata, syllables=7, concat=True)+"\n"
  haiku = haiku+makeline(A,O,worddata, syllables=5, concat=True)+"\n"
  return haiku
       

In [369]:
spA = np.array(HMM.A)
spO = np.array(HMM.O)

print("************  EXAMPLE SONNET ***************")
print(makeSonnet(spA, spO, worddata, rhymedict2))
print("\n\n************  EXAMPLE HAIKU ****************")
print(makeHaiku(spA,spO,worddata))

************  EXAMPLE SONNET ***************
Child lengths my thy true make my to lie art
Bait in the or black can up die more then
Thee calls but but when he i what me dart
Faculty he have false for might wont pen

Pride slain cold shadow fair guilt worms why sweet
Pitch ornament sight hate of did i spend
Be when doth against a greater fell heat
In number knowing part which green commend

Weak and store see sworn of those filching some
Theft and sin the love heir ten in and main
Time's salve broken of dearer whilst till come
I thinks against this he blessed showers slain

	Sin once to the dye in her own glory
	Second frantic-mad coming i story

************  EXAMPLE HAIKU ****************
ever for to edge
thy to some so mine field of
enclose again and



#NOW FOR THE NEURAL NET
Setup for neural net training:

In [371]:
#Pulls heavily from the Keras LSTM online tutorial:
# https://keras.io/examples/generative/lstm_character_level_text_generation/

from tensorflow import keras
from tensorflow.keras import layers

#This time we'll include punctuation:
spfile = request.urlopen(spurl)
text =  ""
for line in spfile:
  stateline = []
  line = line.decode('utf-8')
  #line = line.strip('!.:?,-();')
  line = line.strip()
  line = line.lower()
  if line.isnumeric() or not line:
    continue
  text = text + line + "\n"
print("Corpus length:", len(text))

#Tutorial setup
chars = sorted(list(set(text)))
tchars = len(chars)
print("Total chars:", tchars)
char_indices = dict((c, i) for i, c in enumerate(chars))
indices_char = dict((i, c) for i, c in enumerate(chars))

maxlen = 40
step = 5
sentences = []
next_chars = []
for i in range(0, len(text) - maxlen, step):
    sentences.append(text[i : i + maxlen])
    next_chars.append(text[i + maxlen])
print("Number of sequences:", len(sentences))

x = np.zeros((len(sentences), maxlen, len(chars)), dtype=np.bool)
y = np.zeros((len(sentences), len(chars)), dtype=np.bool)
for i, sentence in enumerate(sentences):
    for t, char in enumerate(sentence):
        x[i, t, char_indices[char]] = 1
    y[i, char_indices[next_chars[i]]] = 1

Corpus length: 93674
Total chars: 38
Number of sequences: 18727


Now train the neural net:

In [374]:
model = keras.Sequential(
    [
        keras.Input(shape=(maxlen, tchars)),
        layers.LSTM(150),
        layers.Dense(len(chars), activation="softmax"),
    ]
)
optimizer = keras.optimizers.RMSprop(learning_rate=0.02)
model.compile(loss="categorical_crossentropy", optimizer=optimizer)

#Training parameters
epochs = 60
batch_size = 150
model.fit(x, y, batch_size=batch_size, epochs=epochs)

Epoch 1/60
Epoch 2/60
Epoch 3/60
Epoch 4/60
Epoch 5/60
Epoch 6/60
Epoch 7/60
Epoch 8/60
Epoch 9/60
Epoch 10/60
Epoch 11/60
Epoch 12/60
Epoch 13/60
Epoch 14/60
Epoch 15/60
Epoch 16/60
Epoch 17/60
Epoch 18/60
Epoch 19/60
Epoch 20/60
Epoch 21/60
Epoch 22/60
Epoch 23/60
Epoch 24/60
Epoch 25/60
Epoch 26/60
Epoch 27/60
Epoch 28/60
Epoch 29/60
Epoch 30/60
Epoch 31/60
Epoch 32/60
Epoch 33/60
Epoch 34/60
Epoch 35/60
Epoch 36/60
Epoch 37/60
Epoch 38/60
Epoch 39/60
Epoch 40/60
Epoch 41/60
Epoch 42/60
Epoch 43/60
Epoch 44/60
Epoch 45/60
Epoch 46/60
Epoch 47/60
Epoch 48/60
Epoch 49/60
Epoch 50/60
Epoch 51/60
Epoch 52/60
Epoch 53/60
Epoch 54/60
Epoch 55/60
Epoch 56/60
Epoch 57/60
Epoch 58/60
Epoch 59/60
Epoch 60/60


<tensorflow.python.keras.callbacks.History at 0x7f31536e7e10>

Poem generation code for neural net:


In [375]:
def sample(preds, temperature=1.0):
    preds = np.asarray(preds).astype("float64")
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

def generatepoem(model, seed, temp, char_indices, indices_char, numlines=14):
  oseed = seed
  tchars = model.layers[-1].output_shape[1]
  maxlen = model.layers[0].input_shape[1]
  generated = ""
  count = 0
  if '\n' in oseed:
    count = 1
  while count < numlines:
    x_pred = np.zeros((1, maxlen, tchars))
    for t, char in enumerate(seed):
      x_pred[0, t, char_indices[char]] = 1.0
    preds = model.predict(x_pred, verbose=0)[0]
    next_index = sample(preds, temp)
    next_char = indices_char[next_index]
    if next_char == '\n':
      count = count+1
      print(count, "of", numlines, "complete")
    seed = seed[1:] + next_char
    generated += next_char
  return oseed+generated

Generate poem at temperatures 1.5, 0.75, 0.25 with provided seed:

In [376]:
seed = "shall i compare thee to a summer's day?\n"
numlines = 14
temps = [1.5, 0.75, 0.25]
for temp in temps:
  poem = generatepoem(model, seed, temp, char_indices, indices_char, numlines)
  print("\n************GENERATED POEM (T=", temp, ")*********\n")
  print(poem)
  print("\n")

2 of 14 complete
3 of 14 complete
4 of 14 complete
5 of 14 complete
6 of 14 complete
7 of 14 complete
8 of 14 complete
9 of 14 complete
10 of 14 complete
11 of 14 complete
12 of 14 complete
13 of 14 complete
14 of 14 complete

************GENERATED POEM (T= 1.5 )*********

shall i compare thee to a summer's day?
nof horce mu doo artus :endwsly theecn honace duu,
bnikert, dost driincnean th, sed ouk of 
yol'n'timo awam dumed nn look evelickct whoechamdrt:
breiled i w nowohinl yod s th aintsanow rippireed,
e wos lamy eyc ensensepus bun forse alou.
fien noond somfoly fwosoormo, you ors or 
rayn, oncenle und farntupedruntrarst misstareed's woh thyt for wtrmperk timee.
he,
woided
sse, ie timek my yat'louef,
ward lold-lork pertrinnerence:ivespingn
so gueuk madowrl, i meve auchorinifelt of nropetxece shaie,
and nhat shomilatied love i yeeaigrny,



2 of 14 complete
3 of 14 complete
4 of 14 complete
5 of 14 complete
6 of 14 complete
7 of 14 complete
8 of 14 complete
9 of 14 complete
10 of 14 c

#VISUALIZATIONS

some ideas:
* table of most probable words per state for HMM
* word cloud
* visualize transition matrix (state graphs and/or matrix heatmaps)