In [1]:
import re
import numpy as np
import string

In [2]:
file = open("kanye_verses.txt", "r", encoding = "utf8")
text = file.read()
text = text.replace("\n\n", "\n")

In [3]:
def clean_lyric(txt):
    return re.sub("[^a-z' ]", "", txt).replace("'", "")

In [4]:
lyrics = text.lower().split("\n")
lyrics = np.unique(lyrics)[1:].tolist()
cleaned_lyrics = [clean_lyric(lyric) for lyric in lyrics]

In [5]:
def create_sequences(lyric, seq_len):
    sequences = []    
    if len(lyric.split()) <= seq_len:
        return [lyric]
    
    for itr in range(seq_len, len(lyric.split())):
        curr_seq = lyric.split()[itr - seq_len:itr + 1]
        sequences.append(" ".join(curr_seq))
    return sequences

In [6]:
raw_sequences = [create_sequences(cleaned_lyric, 2) for cleaned_lyric in cleaned_lyrics]

In [7]:
sequences = np.unique(np.array(sum(raw_sequences, []))).tolist()

In [151]:
uniq_words = np.unique(np.array(" ".join(sequences).split(" ")))
uniq_words_idx = np.arange(uniq_words.size)

word_to_idx = dict(zip(uniq_words.tolist(), uniq_words_idx.tolist()))
idx_to_word = dict(zip(uniq_words_idx.tolist(), uniq_words.tolist()))

vocab_size = len(word_to_idx)

In [99]:
x_word = []
y_word = []

for seq in sequences:
    
    if (len(seq.split()) != 3):
        continue
    
    x_word.append(" ".join(seq.split()[:-1]))
    y_word.append(" ".join(seq.split()[1:]))

In [33]:
def get_seq_idx(seq):
    return [word_to_idx[word] for word in seq.split()]

In [104]:
x_idx = np.array([get_seq_idx(word) for word in x_word])
y_idx = np.array([get_seq_idx(word) for word in y_word])

In [75]:
def get_next_batch(x, y, batch_size):
    
    for itr in range(batch_size, x.shape[0], batch_size):
        curr_x = x[itr - batch_size:itr, :]
        curr_y = y[itr - batch_size:itr, :]
        
        yield curr_x, curr_y

In [36]:
import torch
import torch.nn as nn

In [167]:
class WordLSTM(nn.Module):
    
    def __init__(self, n_hidden=256, n_layers=4, drop_prob=0.3, lr=0.001):
        super().__init__()

        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        
        self.emb_layer = nn.Embedding(vocab_size, 200)

        ## define the LSTM
        self.lstm = nn.LSTM(200, n_hidden, n_layers, 
                            dropout=drop_prob, batch_first=True)
        
        ## define a dropout layer
        self.dropout = nn.Dropout(drop_prob)
        
        ## define the fully-connected layer
        self.fc = nn.Linear(n_hidden, vocab_size)      
    
    def forward(self, x, hidden):
        ''' Forward pass through the network. 
            These inputs are x, and the hidden/cell state `hidden`. '''

        ## pass input through embedding layer
        embedded = self.emb_layer(x)     
        
        ## Get the outputs and the new hidden state from the lstm
        lstm_output, hidden = self.lstm(embedded, hidden)
        
        ## pass through a dropout layer
        out = self.dropout(lstm_output)
        
        #out = out.contiguous().view(-1, self.n_hidden) 
        out = out.reshape(-1, self.n_hidden) 

        ## put "out" through the fully-connected layer
        out = self.fc(out)

        # return the final output and the hidden state
        return out, hidden
    
    
    def init_hidden(self, batch_size):
        ''' initializes hidden state '''
        # Create two new tensors with sizes n_layers x batch_size x n_hidden,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data

        # if GPU is available
        if (torch.cuda.is_available()):
          hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                    weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        
        # if GPU is not available
        else:
          hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                    weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
        
        return hidden

In [168]:
num_hidden = 256
num_layers = 4
embed_size = 200
drop_prob = 0.3
lr = 0.001
num_epochs = 15
batch_size = 32

In [173]:
model = WordLSTM()

In [181]:
def train(net, epochs=10, batch_size=32, lr=0.001, clip=1, print_every=32):
    
    # optimizer
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    
    # loss
    criterion = nn.CrossEntropyLoss()
    
    # push model to GPU
    #net.cuda()
    
    counter = 0

    net.train()

    for e in range(epochs):

        # initialize hidden state
        h = net.init_hidden(batch_size)
        
        for x, y in get_next_batch(x_idx, y_idx, batch_size):
            counter += 1
            
            # convert numpy arrays to PyTorch arrays
            inputs, targets = torch.from_numpy(x).type(torch.LongTensor), torch.from_numpy(y).type(torch.LongTensor)
            
            # push tensors to GPU
            #inputs, targets = inputs.cuda(), targets.cuda()

            # detach hidden states
            h = tuple([each.data for each in h])

            # zero accumulated gradients
            net.zero_grad()
            
            # get the output from the model
            output, h = net(inputs, h)
            
            # calculate the loss and perform backprop
            loss = criterion(output, targets.view(-1))

            # back-propagate error
            loss.backward()

            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)

            # update weigths
            opt.step()            
            
            if counter % print_every == 0:
            
              print("Epoch: {}/{}...".format(e+1, epochs),
                    "Step: {}...".format(counter))

In [182]:
train(net, batch_size = 32, epochs=20, print_every=256)

Epoch: 1/20... Step: 256...
Epoch: 1/20... Step: 512...
Epoch: 1/20... Step: 768...
Epoch: 1/20... Step: 1024...
Epoch: 2/20... Step: 1280...
Epoch: 2/20... Step: 1536...
Epoch: 2/20... Step: 1792...
Epoch: 2/20... Step: 2048...
Epoch: 3/20... Step: 2304...
Epoch: 3/20... Step: 2560...
Epoch: 3/20... Step: 2816...
Epoch: 3/20... Step: 3072...
Epoch: 4/20... Step: 3328...
Epoch: 4/20... Step: 3584...
Epoch: 4/20... Step: 3840...
Epoch: 4/20... Step: 4096...
Epoch: 5/20... Step: 4352...
Epoch: 5/20... Step: 4608...
Epoch: 5/20... Step: 4864...
Epoch: 5/20... Step: 5120...
Epoch: 5/20... Step: 5376...
Epoch: 6/20... Step: 5632...
Epoch: 6/20... Step: 5888...
Epoch: 6/20... Step: 6144...
Epoch: 6/20... Step: 6400...
Epoch: 7/20... Step: 6656...
Epoch: 7/20... Step: 6912...
Epoch: 7/20... Step: 7168...
Epoch: 7/20... Step: 7424...
Epoch: 8/20... Step: 7680...
Epoch: 8/20... Step: 7936...
Epoch: 8/20... Step: 8192...
Epoch: 8/20... Step: 8448...
Epoch: 9/20... Step: 8704...
Epoch: 9/20... St

In [204]:
def predict(net, tkn, h=None):
         
    # tensor inputs
    x = np.array([[word_to_idx[tkn]]])
    inputs = torch.from_numpy(x).type(torch.LongTensor)
  
    # push to GPU
    #inputs = inputs.cuda()

    # detach hidden state from history
    h = tuple([each.data for each in h])

    # get the output of the model
    out, h = net(inputs, h)

    # get the token probabilities
    p = F.softmax(out, dim=1).data

    p = p.cpu()

    p = p.numpy()
    p = p.reshape(p.shape[1],)

    # get indices of top 3 values
    top_n_idx = p.argsort()[-3:][::-1]

    # randomly select one of the three indices
    sampled_token_index = top_n_idx[random.sample([0,1,2],1)[0]]

    # return the encoded value of the predicted char and the hidden state
    return idx_to_word[sampled_token_index], h

In [205]:
import torch.nn.functional as F
import random

In [255]:
# function to generate text
def sample(net, size, prime):
        
    # push to GPU
    #net.cuda()
    
    net.eval()

    # batch size is 1
    h = net.init_hidden(1)

    toks = prime.split()

    # predict next token
    for t in prime.split():
        token, h = predict(net, t, h)
    
    toks.append(token)

    # predict subsequent tokens
    for i in range(size-1):
        token, h = predict(net, toks[-1], h)
        toks.append(token)

    return ' '.join(toks)

In [270]:
sample(net, 10, "with")

'with a fishstick bitch like i cant spend the presidito hola'