In [None]:
import torch
import torch.optim as optim
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F
from tqdm.notebook import tqdm
import random
import sys
from scipy.special import softmax

from HMM import unsupervised_HMM
from HMM_helper import (
    sample_sentence,
)
import re
def parse_observations(text):
    # Convert text to dataset.
    lines = [line.split() for line in text.split('\n') if line.split()]
    obs = []
    obs_map = {}
    obs_map['\n'] = 0
    obs_counter = 1

    for line in lines:
        obs_elem = []
        
        for word in line:
            word = re.sub(r'[^\w\n]', '', word).lower()
            if word not in obs_map:
                # Add unique words to the observations map.
                obs_map[word] = obs_counter
                obs_counter += 1
            
            # Add the encoded word.
            obs_elem.append(obs_map[word])
        obs_elem.append(obs_map['\n'])
        # Add the encoded sequence.
        obs.append(obs_elem)

    return obs, obs_map

In [None]:
with open ("shakespeare.txt", "r") as f:
    data=f.readlines()
text = ''
for line in data:
    text = text + line.lower()
text[:200]

In [None]:
obs, obs_map = parse_observations(text)

In [None]:
my_split = ['shall', 'i', 'compare', 'thee', 'to', 'a', "summer's", 'day']
my_obs = []
for word in my_split:
    word = re.sub(r'[^\w\n]', '', word).lower()
    # Add the encoded word.
    my_obs.append(obs_map[word])
my_obs.append(obs_map['\n'])

In [None]:
my_obs

In [None]:
inv_obs_map = {}
for key in (obs_map):
    inv_obs_map[obs_map[key]] = key


In [None]:
text = []
for ob in obs:
    for o in ob:
        text.append(o)

In [None]:
max_len = 8
x_data = []
y_data = []
for i in range(1,len(text) - max_len-1):
    x_data.append(text[i:i+max_len])
    y_data.append(text[i+max_len])

In [None]:
def sample(preds, temperature=1.0):
    # helper function to sample an index from a probability array
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

In [None]:
class LanguagePredictor(nn.Module):

    def __init__(self, vocab_size, embedding_dim, output_size):
        super(LanguagePredictor, self).__init__()
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, 128, bidirectional = True) # Change setting to make Bi-directional, dropout = 0.5
        self.linear1 = nn.Linear(128*max_len*2, output_size)
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim

    def forward(self, inputs):
        embeds = self.embeddings(inputs)
        lstm_out, _ = self.lstm(embeds)
        out = self.linear1((lstm_out.view(-1, 128*max_len*2)))
        return out
model = LanguagePredictor(len(obs_map)+1, 30, len(obs_map)).to('cuda')
loss_function = nn.CrossEntropyLoss()


In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.0001)
indices=np.arange(len(x_data))[:18000]
val_indices=np.arange(len(x_data))[18000:]
batch_x, batch_y = [], []
batch_size = 256
for epoch in tqdm(range(300)):
    epoch_loss = 0
    np.random.shuffle(indices)
    model.train()
    for i in tqdm(indices):
        x = x_data[i]
        y = y_data[i]
        x_arr = np.zeros(max_len)
        for i in range(len(x)):
            x_arr[i] = x[i] + 1
        batch_x.append(x_arr)
        batch_y.append(y)
        if(len(batch_x) == batch_size):

            model.zero_grad()

            log_probs = model(torch.tensor(batch_x, dtype = torch.long).to('cuda'))

            loss = loss_function(log_probs, torch.tensor(batch_y, dtype=torch.long).to('cuda'))

            loss.backward()
            optimizer.step()

            batch_x, batch_y = [], []
            epoch_loss += loss.item()
    model.eval()
    epoch_val_loss = 0
    for i in tqdm(val_indices):
        x = x_data[i]
        y = y_data[i]
        x_arr = np.zeros(max_len)
        for i in range(len(x)):
            x_arr[i] = x[i] + 1
        batch_x.append(x_arr)
        batch_y.append(y)
        if(len(batch_x) == batch_size):


            log_probs = model(torch.tensor(batch_x, dtype = torch.long).to('cuda'))

            loss = loss_function(log_probs, torch.tensor(batch_y, dtype=torch.long).to('cuda'))


            batch_x, batch_y = [], []
            epoch_val_loss += loss.item()
    print(epoch_loss, epoch_val_loss)

In [None]:
model.eval()
def sentence_to_text(sentence):
    txt = ''
    for word in sentence:
        txt+= inv_obs_map[word] + ' '
    return txt
        
start_index = random.randint(0, len(text) - max_len - 1)
for diversity in [0.01, 0.05, 0.1, 0.2, 0.5]:
    print('$$$$$$$$$$$$$$$$$$$$$$$')
    print(diversity)
    print('$$$$$$$$$$$$$$$$$$$$$$$')

    generated = ''
    x = my_obs[-8:]#text[start_index: start_index + max_len]
    sentence = sentence_to_text(x)
    generated += sentence
    print(sentence)

    for i in range(200):
        x_arr = np.zeros(max_len)
        for i in range(len(x)):
            x_arr[i] = x[i] + 1
        preds = (softmax(model(torch.tensor([x_arr], dtype=torch.long).cuda()).cpu().detach().numpy() )+ 1e-90)
        preds = preds/preds.sum()
        next_index = sample(preds.reshape(-1), diversity)
        next_char = inv_obs_map[next_index]
        
        x = x[1:]
        x.append(next_index)
        
        sys.stdout.write(' ' + next_char)
        sys.stdout.flush()
    print()