## Importing Libraries
Importing required libraries

In [1]:
# Importing libraries required for handling the data
import pandas as pd
import numpy as np
import random
import time
import string
import re

# Importing libraries required for importing the data
from google.colab import files
import io

Uploading the Data file

In [2]:
uploaded = files.upload()

Saving Shakespeare_data.csv to Shakespeare_data.csv


##Pre-Processing the dataset

Cleaning the dataset, Dropping rows with NA values and resetting index

In [3]:
path = io.BytesIO(uploaded['Shakespeare_data.csv'])
df = pd.read_csv(path)
df = df.dropna()
df = df.reset_index()
df

Unnamed: 0,index,Dataline,Play,PlayerLinenumber,ActSceneLine,Player,PlayerLine
0,3,4,Henry IV,1.0,1.1.1,KING HENRY IV,"So shaken as we are, so wan with care,"
1,4,5,Henry IV,1.0,1.1.2,KING HENRY IV,"Find we a time for frighted peace to pant,"
2,5,6,Henry IV,1.0,1.1.3,KING HENRY IV,And breathe short-winded accents of new broils
3,6,7,Henry IV,1.0,1.1.4,KING HENRY IV,To be commenced in strands afar remote.
4,7,8,Henry IV,1.0,1.1.5,KING HENRY IV,No more the thirsty entrance of this soil
...,...,...,...,...,...,...,...
105147,111390,111391,A Winters Tale,38.0,5.3.179,LEONTES,"Is troth-plight to your daughter. Good Paulina,"
105148,111391,111392,A Winters Tale,38.0,5.3.180,LEONTES,"Lead us from hence, where we may leisurely"
105149,111392,111393,A Winters Tale,38.0,5.3.181,LEONTES,Each one demand an answer to his part
105150,111393,111394,A Winters Tale,38.0,5.3.182,LEONTES,Perform'd in this wide gap of time since first


Focussing on one play as limited resources available

In [4]:
df_onePlay = df.loc[df['Play']=='Henry IV']
print(df_onePlay.shape)
text_corpus = df_onePlay['PlayerLine']
text_corpus

(3044, 7)


0               So shaken as we are, so wan with care,
1           Find we a time for frighted peace to pant,
2       And breathe short-winded accents of new broils
3              To be commenced in strands afar remote.
4            No more the thirsty entrance of this soil
                             ...                      
3039    To fight with Glendower and the Earl of March.
3040       Rebellion in this land shall lose his sway,
3041           Meeting the cheque of such another day:
3042          And since this business so fair is done,
3043         Let us not leave till all our own be won.
Name: PlayerLine, Length: 3044, dtype: object

## Defining Hidden Markov Model

HMM Parameters:

*   Transition Matrix (T) -  This is the joint probability matrix that tells us the probability of transition from one hidden state to the other state.
*   Emission Matrix (E) - This is the probability distribution for the observations given the hidden states.
*   Initial Probabilities (pi) - These are the initial probabilities for all the hidden states.

Used forward backward algorithm, specifically Baum Welch algorithm to compute the optimal Transition matrix and Emission Matrix.

Reference: https://medium.com/analytics-vidhya/baum-welch-algorithm-for-training-a-hidden-markov-model-part-2-of-the-hmm-series-d0e393b4fb86

All of the matrix and initial probabilities are random in the first iteration, then using Baum Welch algorithm for forward backward Expectation Maximization, there are 3 steps as follows:

For expectation step there are 2 phases:
* Forward phase: with the observations in this step we propogate the information forward, this computes the alpha function values.
* Backward Phase: The backward phase enables us to propogate the information (error) backwards and correct the model itsel. This computes the beta function values.

The Maximization step has one phase:
* Update Phase: The update phase takes the alpha and the beta values we calculated in the expectation steps and caluclates the gamma and xi related to them. With the gamma and xi we maximize the HMM parameters, i.e. Transition matrix (T), Emission Matrix (E), and the initial probabilities for the hidden states (pi).

The train funciton just call these phases function in order for all the observations, and converges when the difference between the old copy and the updated copy crosses a threshold or maximum iterations are reached.


In [5]:
class HMM:
  def __init__(self,n_hidden_states, obs_states,init_probs = None):
    self.hidden_states = [i for i in range(n_hidden_states)]
    self.n_hidden_states = n_hidden_states
    self.obs_states = obs_states
    self.n_obs_states = len(obs_states)
    # Hidden variable probability transition stochastic matrix
    self.T = np.random.rand(len(self.hidden_states),len(self.hidden_states))
    # Observation given hidden probabilty emission matrix
    self.E = np.random.rand(len(self.hidden_states),self.n_obs_states)
    self.initial_probabilities(len(self.hidden_states))
    for i in range(len(self.hidden_states)):
      self.T[i,:] /= self.T[i,:].sum(axis=0) # Normalizing
      self.E[i,:] /= self.E[i,:].sum(axis=0) # Normalizing
    self.word_map = self.word_map(self.obs_states)
    self.index_map = self.index_map()

  def initial_probabilities(self,n_hidden_states):
    size = n_hidden_states # Total number of states
    randomUniformProbs = np.random.rand(size) / (size**2) + 1 / size # Random distribution between [0,1] for total number of states
    randomUniformProbs /= randomUniformProbs.sum(axis=0) # Normalizing
    self.pi = np.array(randomUniformProbs).reshape(1,-1)

  def forwardPhase(self,observation):
    # total alphas, one for every hidden state upto time k (k states)
    alpha = np.zeros((len(self.hidden_states), len(observation)))
    alpha[:,0] = self.pi[:] * self.E[:,observation[0]]

    for k in range(len(observation) - 1):
        for i in range(self.n_hidden_states):
            sum = 0
            for j in range(self.n_hidden_states):
                sum += alpha[j,k] * self.T[j,i]
            alpha[i,k+1] = self.E[i,observation[k+1]] * sum
    
    return alpha

  def backwardPhase(self,observation):
    # The beta function is defined as the conditional probability of the observed data 
    # from time k+1 given the state at time k
    beta = np.zeros((len(self.hidden_states), len(observation)))
    beta[:,-1] = 1

    for k in range(len(observation) - 2, -1, -1):
      for i in range(self.n_hidden_states):
        for j in range(self.n_hidden_states):
          beta[i,k] += beta[j,k+1] * self.T[i,j] * self.E[j,observation[k+1]]
    return beta

  def updatePhase(self, alpha, beta, observation):
    gamma = np.zeros((len(self.hidden_states), len(observation)))

    for k in range(len(observation)):
      denom = 0
      for i in range(len(self.hidden_states)):
        gamma[i,k] = alpha[i,k] * beta[i,k]
        denom += gamma[i,k]
      gamma[:,k] = gamma[:,k] / denom

    xi = np.zeros((len(self.hidden_states), len(self.hidden_states), len(observation)))
    for k in range(len(observation)-1):
      denom = 0
      for i in range(self.n_hidden_states):
        for j in range(self.n_hidden_states):
          xi[i,j,k] = alpha[i,k] * self.T[i,j] * beta[j,k+1] * self.E[j,observation[k+1]]
          denom += xi[i,j,k]
      xi[:,:,k] = xi[:,:,k] / denom
    return gamma, xi

  def word_map(self, uniqueWords):
    map = {}
    i = 0
    for word in uniqueWords:
      if word not in map:
        map[word] = i
        i += 1
    return map

  def index_map(self):
    map = {}
    for word in self.word_map:
      index = self.word_map[word]
      map[index] = word
    return map

  def decodeText(self, index):
    if index not in self.index_map:
      return self.index_map[0]
    return self.index_map[index]

  def encodeText(self, text):
    indexedWords = []
    for seq in text:
      seq_by_index = []
      for word in seq:
        seq_by_index.append(self.word_to_index(word))
      indexedWords.append(seq_by_index)
    return indexedWords

  def word_to_index(self, word):
    if word not in self.word_map: #for unknown words
      return 0
    return self.word_map[word]

  def getUniques(self, observations):   
    uniques = []
    for sentence in observations:
      words = re.split('\s+', sentence)
      for word in words:
        if word not in uniques:
          uniques.append(word)
    return uniques

  def __repr__(self):
    return "HMM \n-> Hidden states: {} \n-> observables: {}. \npi = {}\nT = {}\nE = {}".format(
        self.hidden_states, self.obs_states, self.pi, self.T, self.E)
    
  def trainModel(self, textCorpus, maxIter=1000, threshold=0.0001):
    uniqueWords = self.getUniques(textCorpus)
    indexedWords = self.encodeText(uniqueWords)
    for it in range(maxIter):
      start = time.time()
      print("Iteration {}".format(it))
      # save old stuff
      old_T = np.copy(self.T)
      old_E = np.copy(self.E)
      old_pi = np.copy(self.pi)
      gammas = []
      xis = []

      for seq, n in zip(indexedWords, range(len(indexedWords))):
        if n % 50 == 0:
          print("Line {}".format(n))
        alpha = self.forwardPhase(seq)
        beta = self.backwardPhase(seq)
        g,xi = self.updatePhase(alpha,beta,seq)
        gammas.append(g)
        xis.append(xi)

      print("Expectation-step complete")

      print("maximizing initial probabilities (pi)")
      for i in range(len(self.hidden_states)):
        temp_pi = 0
        for T in range(len(textCorpus)):
          temp_pi += gammas[T][i,0]
        self.pi[0,i] = temp_pi
      self.pi = self.pi / np.sum(self.pi)

      # a
      print("maxmizing Transition Probabilities (T)")
      for i in range(self.n_hidden_states):
        denom = 0
        for T in range(len(indexedWords)):
          denom += np.sum(gammas[T][i,0:-2])

        for j in range(self.n_hidden_states):
          numer = 0
          for T in range(len(indexedWords)):
            numer += np.sum(xis[T][i,j,0:-2])
          self.T[i,j] = numer / denom

      # b
      print("maximizing Emission Matrix (E)")
      self.E = np.zeros((self.n_hidden_states, self.n_obs_states))
      for i in range(self.n_hidden_states):
        denom = 0
        for seq, T in zip(indexedWords, range(len(indexedWords))):
          denom += np.sum(gammas[T][i,:])
          for word, t in zip(seq, range(len(seq))):
            self.E[i, word] += gammas[T][i,t]

        self.E[i,:] = self.E[i,:] / denom

      print("Maximization-step Complete")
      diffT = np.absolute(np.linalg.norm(self.T) - np.linalg.norm(old_T))
      diffE = np.absolute(np.linalg.norm(self.E) - np.linalg.norm(old_E))
      diffPi = np.absolute(np.linalg.norm(self.pi) - np.linalg.norm(old_pi))
      total_diff = diffT + diffE + diffPi
      print("Total diff: {}".format(total_diff))
      print("Total time: {}".format(time.time() - start))
      if total_diff <= threshold:
        break

    return self.T, self.E, self.pi

  def genText(self, length):
    state = np.random.choice(range(self.n_hidden_states), p=self.pi[0])
    s = ''
    for i in range(length):
      s += self.decodeText(np.random.choice(list(self.E[state]), p=list(self.E[state]))) + ' ' # Emission in the state
      state = np.random.choice(range(self.n_hidden_states), p=self.T[state]) # Transition from state to state

    print(s)
  
  def viterbi(self,sequence,n):
    # s = re.split('\s+', sequence)
    s = self.encodeText(sequence)

    t1 = np.zeros((self.n_hidden_states, len(s)))
    t2 = np.zeros((self.n_hidden_states, len(s)))

    for i in range(self.n_hidden_states):
      t1[i, 0] = self.pi[0,i] * self.E[i, s[0]]
      t2[i, 0] = 0

    for i in range(1, len(s)):
      for j in range(self.n_hidden_states):
        t = t1[:, i - 1] * self.T[:][j] * self.E[j, s[i]]
        t1[j][i] = np.max(t)
        t2[j][i] = np.argmax(t)

    z = np.zeros(len(s))
    x = np.zeros(len(s))

    z[-1] = np.argmax(t1[:, -1])
    x[-1] = int(z[-1])

    for i in range(len(s) - 1, 0, -1):
      z[i - 1] = t2[int(z[i]), i]
      x[i - 1] = z[i - 1]

    state = x[-1]

    s = ''

    for i in range(n):
      state = np.random.choice(range(self.n_hidden_states), p=self.T[int(state)])
      s += self.decodeText(np.random.choice(list(self.E[int(state)]), p=list(self.E[int(state)]))) + ' '

    print(s)


HMM training and results

Provided a limited text corpus.

In [6]:
hmm = HMM(10,text_corpus[1:100])
T,E,pi = hmm.trainModel(text_corpus[1:100])

Iteration 0
Line 0
Line 50
Line 100
Line 150
Line 200
Line 250
Line 300
Line 350
Line 400
Line 450
Expectation-step complete
maximizing initial probabilities (pi)
maxmizing Transition Probabilities (T)
maximizing Emission Matrix (E)
Maximization-step Complete
Total diff: 3.0774593964091963
Total time: 1.211127519607544
Iteration 1
Line 0
Line 50
Line 100
Line 150
Line 200
Line 250
Line 300
Line 350
Line 400
Line 450
Expectation-step complete
maximizing initial probabilities (pi)
maxmizing Transition Probabilities (T)
maximizing Emission Matrix (E)
Maximization-step Complete
Total diff: 2.248201624865942e-14
Total time: 1.1606712341308594


Generating text using the trained HMM model using Baum Welch algorithm.

In [7]:
hmm.genText(50)

Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a time for frighted peace to pant, Find we a t

Using Viterbi algorithm to generate the best path given a sequence.

In [8]:
hmm.viterbi("to be ",1)

Find we a time for frighted peace to pant, 


**Conclusion:** The Hidden Markov Model (HMM) performs well on these type of problems, from the results I can see that the model may perform good with better/more training. Currently with 10 hidden states and only 100 lines of observations from the whole text corpus the HMM model does not have much idea about how shakespeare writes. It was only able to learn one line from the given observations and hence performs bad currently. I am hoping with more training, and with more hidden states the results will improve for both generating random sequences, and for Viterbi (generating best path for given sequence).

**Future work:** I considered playing around with the text corpus with frequencies of the words, with some threshold for frequency, if the words appear more than the threshold count it as a hidden state. I did start with the implementation for this but could not finish it as had some difficulties at the end while training (laptop crashed, few functionalities deleted)