In [1]:
import tensorflow as tf
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import re
import time
import math


path_to_file = tf.keras.utils.get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')
# Read, then decode for py2 compat.
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')
# length of text is the number of characters in it
print('Length of text: {} characters'.format(len(text)))

Length of text: 1115394 characters


In [2]:
len(text.split())

202651

In [3]:
text = re.sub('\.', ' <eos>', text)
text = re.sub('\!', ' <eos>', text)
text = re.sub('\?', ' <eos>', text)
text = re.sub('\,', '', text)
text = re.sub('\:', ' :', text)
text = re.sub('\;', '', text)
text = re.sub('\--', ' <eos>', text)

In [4]:
text = ' '.join(text.split())

In [5]:
text



In [6]:
text_lower = []
for i in text.split():
  i = i.lower()
  text_lower.append(i)
  
text_lower = ' '.join(text_lower)
text_lower



In [None]:
word_counts = {}

for i in range(len(text_lower.split())):
  if text_lower.split()[i] in word_counts:
    word_counts[text_lower.split()[i]] += 1
  else:
    word_counts[text_lower.split()[i]] = 1

In [None]:
word_counts_list = []
for key in word_counts:
  word_counts_list.append((key, word_counts[key]))

In [None]:
ordered_list = sorted(word_counts_list, key = lambda word: word[1], reverse=True)
print(len(ordered_list))
ordered_list = ordered_list[0:5000]
ordered_list

In [None]:
word2idx = {}
for i in range(len(ordered_list)):
  word2idx[ordered_list[i][0]] = i
print(len(word2idx))
word2idx['<unk>'] = len(word2idx)
print(len(word2idx))

In [None]:
text_words = []
text_idx = []

for word in text_lower.split():
    words = []
    idx = []
    if word in word2idx:
      text_words.append(word)
      text_idx.append(word2idx[word])
    else:
      words.append('<unk>')
      idx.append(len(word2idx)-1)

text_words = ' '.join(text_words)

In [None]:
len(text_words.split())

In [None]:
text_idx = torch.Tensor(text_idx)

In [None]:
# creates sequences of a certain length

#seq_length = 10

#text_words_seq = []
#text_idx_seq = []

#for i in range(0, len(text_words.split()) - seq_length, seq_length):
#  seq_wds = ' '.join(text_words.split()[i : i + seq_length])
#  text_words_seq.append(seq_wds)

#  seq_idxs = text_idx[i : i + seq_length]
#  text_idx_seq.append(seq_idxs)

In [None]:
# 80 percent train, 10 percent validation, 10 percent test split

#end1 = round(len(text_words_seq)*.9) # to get 90% for training
#end2 = round(len(text_words_seq)*.95) # to get 5% for validation and test
#print(end1)
#print(end2)


#train = torch.Tensor(text_idx_seq[0:end1])
#val_data = torch.Tensor(text_idx_seq[end1:end2])
#test_data = torch.Tensor(text_idx_seq[end1:])

#train_data = train.long()
#val_data = val_data.long()
#test_data = test_data.long()

In [None]:
class twoLayer_LSTM(nn.Module):
    def __init__(self, vocab_size, hidden_size, layers):
        super().__init__()
        self.emb_layer = nn.Embedding(vocab_size, hidden_size)
        self.rec_layer = nn.LSTM(hidden_size, hidden_size, num_layers=layers)
        self.lin_layer = nn.Linear(hidden_size, vocab_size)
        # if want to make bi directional
        #self.rec_layer = nn.LSTM(hidden_size, hidden_size, num_layers=layers, bidirectional=True)
        #self.lin_layer = nn.Linear(hidden_size*2, vocab_size)

    def forward(self, word_seq, h_init, c_init):
        g_seq = self.emb_layer(word_seq)  
        h_seq, (h_last, c_last) = self.rec_layer(g_seq, (h_init, c_init))
        score_seq = self.lin_layer(h_seq)
        return score_seq, (h_last, c_last)

In [None]:
def evaluate(data):
    running_loss = 0
    num_batches = 0    
    with torch.no_grad():
        h = torch.zeros(layers, bs, hidden_size)
        c = torch.zeros(layers, bs, hidden_size)
        h = h.to(device)
        c = c.to(device)
        for count in range(0, len(data) - seq_length, seq_length):
            minibatch_data = data[count:count + seq_length]
            minibatch_label = data[count+1:count + seq_length + 1]
            minibatch_data = minibatch_data.to(device)
            minibatch_label = minibatch_label.to(device)
            scores, (h, c) = net(minibatch_data, h, c)
            minibatch_label = minibatch_label.view(bs * seq_length) 
            scores = scores.view(bs * seq_length, vocab_size)
            loss = criterion(scores, minibatch_label)    
            h = h.detach()
            c = c.detach()
            num_batches += 1  
    return loss.item()

def normalize_gradient(net):
    grad_norm_sq = 0
    for p in net.parameters():
        grad_norm_sq += p.grad.data.norm()**2
    grad_norm = math.sqrt(grad_norm_sq)
    if grad_norm < 1e-4:
        net.zero_grad()
        print('grad norm close to zero')
    else:    
        for p in net.parameters():
             p.grad.data.div_(grad_norm)
    return grad_norm

In [None]:
# setup NN
hidden_size = 100
vocab_size = len(word2idx)+1
layers = 2
num_epoch = 5
bs = 10
seq_length = 10

device = torch.device("cuda")
net = twoLayer_LSTM(vocab_size, hidden_size, layers)
net.emb_layer.weight.data.uniform_(-0.1, 0.1)
net.lin_layer.weight = net.emb_layer.weight
net = net.to(device)
criterion = nn.CrossEntropyLoss()
train_size = len(text_idx)

In [None]:
# training with SGD
start = time.time()

train_loss_list = []
val_loss_list = []
test_loss_list = []

for epoch in range(num_epoch):
    #if epoch > 0:
    my_lr = 1.5 * math.exp(-0.5 * epoch)
    optimizer = optim.SGD(net.parameters(), lr=my_lr, momentum=0.9)
            
    # set the running quantities to zero at the beginning of the epoch
    running_loss = 0
    num_batches = 0    
       
    # set the initial h to be the zero vector
    h = torch.zeros(layers, bs, hidden_size)
    c = torch.zeros(layers, bs, hidden_size)
    # send it to the gpu    
    h = h.to(device)
    c = c.to(device)

    for count in range(0, train_size - bs, bs):    
        # Set the gradients to zeros
        optimizer.zero_grad()
        
        # create a minibatch
        minibatch_data = text_idx.long()[count : count + bs]
        print(minibatch_data)
        minibatch_label = text_idx.long()[count + 1 : count + bs + 1]
        print(minibatch_label)
                
        # send them to the gpu
        minibatch_data = minibatch_data.to(device)
        minibatch_label = minibatch_label.to(device)
        
        # Detach to prevent from backpropagating all the way to the beginning
        # Then tell Pytorch to start tracking all operations that will be done on h and c
        h = h.detach()
        c = c.detach()
        h = h.requires_grad_()
        c = c.requires_grad_()
        # forward the minibatch through the net 
        scores, (h, c) = net(minibatch_data, h, c)
        # reshape the scores and labels to huge batch of size bs*seq_length
        scores = scores.view(bs * seq_length, vocab_size)  
        minibatch_label = minibatch_label.view(bs * seq_length)       
        
        # Compute the average of the losses of the data points in this huge batch
        loss = criterion(scores, minibatch_label)
        
        # backward pass to compute dL/dR, dL/dV and dL/dW
        loss.backward()

        # do one step of stochastic gradient descent: R=R-lr(dL/dR), V=V-lr(dL/dV), ...
        normalize_gradient(net)
        optimizer.step()
        
        # update the running loss  
        #running_loss += loss.item()
        num_batches += 1
                          
    #total_loss = running_loss/num_batches
    elapsed = time.time() - start
    print('\nepoch =', epoch, '\t time = {0:.1f}'.format(elapsed),'\t lr = {0:.3f}'.format(my_lr), '\t training loss = {0:.3f}'.format(loss.item())) # compute error on the test set at end of each epoch
    val_loss = evaluate(val_data) # eval on the validation set
    train_loss_list.append(loss.item())
    val_loss_list.append(val_loss)
    test_loss = evaluate(test_data) # eval on the test set
    test_loss_list.append(test_loss)
    print('val loss = {0:.3f}'.format(val_loss))
    print('test loss = {0:.3f}'.format(test_loss))

print(" ")

In [None]:
# training with Adagrad
start = time.time()

train_loss_list = []
val_loss_list = []
test_loss_list = []

for epoch in range(num_epoch):
    #if epoch > 0:
    my_lr = 0.1 * math.exp(-0.5 * epoch)
    optimizer = optim.Adagrad(net.parameters(), lr=my_lr)
            
    # set the running quantities to zero at the beginning of the epoch
    running_loss = 0
    num_batches = 0    
       
    # set the initial h to be the zero vector
    h = torch.zeros(layers, bs, hidden_size)
    c = torch.zeros(layers, bs, hidden_size)
    # send it to the gpu    
    h = h.to(device)
    c = c.to(device)

    for count in range(0, train_size - bs, bs):    
        # Set the gradients to zeros
        optimizer.zero_grad()
        
        # create a minibatch
        minibatch_data = train_data[count : count + bs]
        minibatch_label = train_data[count + 1 : count + bs + 1]        
                
        # send them to the gpu
        minibatch_data = minibatch_data.to(device)
        minibatch_label = minibatch_label.to(device)
        
        # Detach to prevent from backpropagating all the way to the beginning
        # Then tell Pytorch to start tracking all operations that will be done on h and c
        h = h.detach()
        c = c.detach()
        h = h.requires_grad_()
        c = c.requires_grad_()
        # forward the minibatch through the net 
        scores, (h, c) = net(minibatch_data, h, c)
        # reshape the scores and labels to huge batch of size bs*seq_length
        scores = scores.view(bs * seq_length, vocab_size)  
        minibatch_label = minibatch_label.view(bs * seq_length)       
        
        # Compute the average of the losses of the data points in this huge batch
        loss = criterion(scores, minibatch_label)
        
        # backward pass to compute dL/dR, dL/dV and dL/dW
        loss.backward()

        # do one step of stochastic gradient descent: R=R-lr(dL/dR), V=V-lr(dL/dV), ...
        normalize_gradient(net)
        optimizer.step()
        
        # update the running loss  
        #running_loss += loss.item()
        num_batches += 1
                          
    #total_loss = running_loss/num_batches
    elapsed = time.time() - start
    print('\nepoch =', epoch, '\t time = {0:.1f}'.format(elapsed),'\t lr = {0:.3f}'.format(my_lr), '\t training loss = {0:.3f}'.format(loss.item())) # compute error on the test set at end of each epoch
    train_loss_list.append(loss.item())
    test_loss = evaluate(test_data) # eval on the test set
    test_loss_list.append(test_loss)
    print('test loss = {0:.3f}'.format(test_loss))

print(" ")

In [None]:
x = range(0, num_epoch,1)

plt.plot(x, train_loss_list, '.-', label='Train Loss')
plt.plot(x, test_loss_list, '.-', label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
idx2word = {y:x for x, y in word2idx.items()}

In [None]:
def show_most_likely_words(prob):
    num_word_display = 15
    p = prob.view(-1)
    p, word_idx = torch.topk(p, num_word_display)
    for i, idx in enumerate(word_idx):
        percentage = p[i].item() * 100
        word = idx2word[idx.item()]
        print("{:.1f}%\t".format(percentage), word) 

def text2tensor(text):
    text = text.lower()
    list_of_words = text.split()
    list_of_idx = []
    for w in list_of_words:
      if w in word2idx:
        idx = word2idx[w]
        list_of_idx.append(idx)
      else:
        list_of_idx.append(len(word2idx)-1)
    x = torch.LongTensor(list_of_idx)
    return x

In [None]:
sentence = "antonio :"

h = torch.zeros(layers, bs, hidden_size)
c = torch.zeros(layers, bs, hidden_size)
h = h.to(device)
c = c.to(device)

data = text2tensor(sentence)
seq_len = len(data)
data = data.view(seq_len, -1)
empty = torch.zeros(seq_len, bs - 1).type(torch.LongTensor)
data = torch.cat((data, empty), dim=1)
data = data.to(device)
scores, (h, c) = net(data, h, c)
scores = scores[seq_len - 1, 0, :]
p = F.softmax(scores.view(1, vocab_size), dim=1)
print(sentence, '... \n')
show_most_likely_words(p)