In [1]:
import torch
import torch.nn as nn
import nltk
import numpy as np
#nltk.download('punkt')

In [2]:
data_dir = 'songs.txt'
with open(data_dir, "r", encoding='utf-8') as f:
    data = f.read()

In [3]:
tokens = nltk.word_tokenize(data)
print('total words:', len(tokens))
print('a batch of data:', tokens[:10])

total words:211681
a batch of data:['Does', "n't", 'take', 'much', 'to', 'make', 'me', 'happy', 'And', 'make']


In [48]:
from collections import Counter

def create_lookup_tables(text):

    word_counts = Counter(text)
    sorted_vocab = sorted(word_counts, key=word_counts.get, reverse=True)
    vocab_to_int = {word: i for i, word in enumerate(sorted_vocab)}
    int_to_vocab = {word: i for i, word in vocab_to_int.items()}
    
    return (vocab_to_int, int_to_vocab)

vocab_to_int, int_to_vocab = create_lookup_tables(tokens)

In [5]:
train_on_gpu = torch.cuda.is_available()
print('training on', ' GPU' if train_on_gpu else ' CPU')

training on GPU


In [6]:
from torch.utils.data import TensorDataset, DataLoader


def create_batches(words, sequence_length, batch_size):

    n_batches = len(words)//batch_size
    words = words[:n_batches*batch_size]

    features, targets = [], []
    for idx in range(0, (len(words) - sequence_length) ):
        features.append(words[idx : idx + sequence_length])
        targets.append(words[idx + sequence_length])   
        
    data = TensorDataset(torch.from_numpy(np.asarray(features)), torch.from_numpy(np.asarray(targets)))
    data_loader = torch.utils.data.DataLoader(data, shuffle=False , batch_size = batch_size)

    # return a dataloader
    return data_loader

In [7]:
def forward_back_prop(rnn, optimizer, criterion, inp, target, hidden):
    
    hidden = tuple([each.data for each in hidden])
    
    if train_on_gpu:
        inp = inp.cuda()
        target = target.cuda()
    # perform backpropagation and optimization
    rnn.zero_grad()

    output, hidden = rnn(inp, hidden)
    target = torch.tensor(target).to(torch.int64)
    loss = criterion(output, target)
    loss.backward()
    nn.utils.clip_grad_norm_(rnn.parameters(), 5)
    optimizer.step()

    # return the loss over a batch and the hidden state produced by our model
    return loss.item(), hidden


In [8]:
def train_rnn(rnn, batch_size, optimizer, criterion, n_epochs, save_every=0):
    
    batch_losses = []
    total_time = 0

    rnn.train()
    start = torch.cuda.Event(enable_timing=True)
    end = torch.cuda.Event(enable_timing=True)

    print("Training for %d epoch(s)..." % n_epochs)
    for epoch_i in range(1, n_epochs + 1):
        
        # initialize hidden state
        hidden = rnn.init_hidden(batch_size)

        start.record()

        for batch_i, (inputs, labels) in enumerate(train_loader, 1):
            
            # make sure you iterate over completely full batches, only
            n_batches = len(train_loader.dataset)//batch_size
            if(batch_i > n_batches):
                break
            
            # forward, back prop
            loss, hidden = forward_back_prop(rnn, optimizer, criterion, inputs, labels, hidden)          
            # record loss
            batch_losses.append(loss)

            # printing loss stats
            import sys
            progress = batch_i * 100/n_batches
            print('\r' + 'Epoch: {:>4}/{:<4}  Loss: {:7}, epoch progress: {:2}%'.format(
                epoch_i, n_epochs, str(np.average(batch_losses))[:14], str(progress)[:4]), end="")
            sys.stdout.flush()

            batch_losses = []

        end.record()
        torch.cuda.synchronize()
        time = start.elapsed_time(end)
        print('\r')
        print('epoch finished in: '+ str(time/60000)[:5], ' minutes')
        total_time += time    

        if (epoch_i == n_epochs):
            print('\r')
            print('training finished in: '+ str(total_time/60000)[:8], ' minutes')

        if save_every != 0 and epoch_i % save_every == 0 and epoch_i != n_epochs:
            print('saving model...')
            torch.save(rnn.state_dict(), 'saved_models/trained_rnn_' + str(epoch_i) + '.pt')

    # returns a trained rnn
    return rnn

In [9]:
sequence_length = 10
batch_size = 64

int_text = [vocab_to_int[word] for word in tokens]
print('a sample of text:', int_text[:5])

train_loader = create_batches(int_text, sequence_length, batch_size)

a sample of text:[1090, 9, 104, 125, 5]


In [10]:
# Training parameters
# Number of Epochs
num_epochs = 20
# Learning Rate
learning_rate = 0.001

# Model parameters
# Vocab size
vocab_size = len(vocab_to_int)
# Output size
output_size = vocab_size
# Embedding Dimension
embedding_dim = 512
# Hidden Dimension
hidden_dim = 256
# Number of RNN Layers
n_layers = 2

In [11]:
%load_ext autoreload
%autoreload 2

import models
# create model and move to gpu if available
rnn = models.RNN(vocab_size, output_size, embedding_dim, hidden_dim, n_layers, dropout=0.5)
if train_on_gpu:
    rnn.cuda()

# defining loss and optimization functions for training
optimizer = torch.optim.Adam(rnn.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

# training the model
try:
    trained_rnn = train_rnn(rnn, batch_size, optimizer, criterion, num_epochs, save_every=5)
    # saving the trained model
    torch.save(trained_rnn, 'saved_models/final_trained_rnn.pt')

except KeyboardInterrupt:
    print('\ntraining stopped')


Training for 20 epoch(s)...
Epoch:    1/20    Loss: 5.799723625183, epoch progress: 100.%
epoch finished in:1.899 minutes
Epoch:    2/20    Loss: 5.585914134979, epoch progress: 100.%
epoch finished in:1.889 minutes
Epoch:    3/20    Loss: 5.286500930786, epoch progress: 100.%
epoch finished in:1.898 minutes
Epoch:    4/20    Loss: 5.080548286437, epoch progress: 100.%
epoch finished in:1.883 minutes
Epoch:    5/20    Loss: 4.889713764190, epoch progress: 100.%
epoch finished in:1.882 minutes
saving model...
Epoch:    6/20    Loss: 4.686353683471, epoch progress: 100.%
epoch finished in:2.755 minutes
Epoch:    7/20    Loss: 4.731459140777, epoch progress: 100.%
epoch finished in:1.893 minutes
Epoch:    8/20    Loss: 4.726864337921, epoch progress: 100.%
epoch finished in:2.684 minutes
Epoch:    9/20    Loss: 4.481072425842, epoch progress: 100.%
epoch finished in:1.931 minutes
Epoch:   10/20    Loss: 4.199986457824, epoch progress: 100.%
epoch finished in:1.867 minutes
saving model...


In [12]:
model = 'saved_models/final_trained_rnn.pt'
picked_rnn = torch.load(model)

In [43]:
def generate(rnn, prime_id, int_to_vocab, token_dict, pad_value, predict_len=100):
    """
    Generate text using the neural network
    :param decoder: The PyTorch Module that holds the trained neural network
    :param prime_id: The word id to start the first prediction
    :param int_to_vocab: Dict of word id keys to word values
    :param token_dict: Dict of puncuation tokens keys to puncuation values
    :param pad_value: The value used to pad a sequence
    :param predict_len: The length of text to generate
    :return: The generated text
    """
    rnn.eval()
    
    # create a sequence (batch_size=1) with the prime_id
    current_seq = np.full((1, sequence_length), pad_value)
    current_seq[-1][-1] = prime_id
    predicted = [int_to_vocab[prime_id]]
    
    for _ in range(predict_len):
        if train_on_gpu:
            current_seq = torch.LongTensor(current_seq).cuda()
        else:
            current_seq = torch.LongTensor(current_seq)
        
        # initialize the hidden state
        hidden = rnn.init_hidden(current_seq.size(0))
        
        # get the output of the rnn
        output, _ = rnn(current_seq, hidden)
        
        # get the next word probabilities
        p = F.softmax(output, dim=1).data
        if(train_on_gpu):
            p = p.cpu() # move to cpu
         
        # use top_k sampling to get the index of the next word
        top_k = 5
        p, top_i = p.topk(top_k)
        top_i = top_i.numpy().squeeze()
        
        # select the likely next word index with some element of randomness
        p = p.numpy().squeeze()
        word_i = np.random.choice(top_i, p=p/p.sum())
        
        # retrieve that word from the dictionary
        word = int_to_vocab[word_i]
        predicted.append(word)     
        
        # the generated word becomes the next "current sequence" and the cycle can continue
        if train_on_gpu:
            current_seq = current_seq.cpu()
        current_seq = np.roll(current_seq, -1, 1)
        current_seq[-1][-1] = word_i
    
    gen_sentences = ' '.join(predicted)
    
    # Replace punctuation tokens
    for key, token in token_dict.items():
        ending = ' ' if key in ['(', '"'] else ''
        gen_sentences = gen_sentences.replace(' ' + token.lower(), key)
    gen_sentences = gen_sentences.replace('\n ', '\n')
    gen_sentences = gen_sentences.replace('( ', '(')
    
    # return all the sentences
    return gen_sentences

In [51]:
import torch.nn.functional as F
import pickle
token_dict  = {}
# run the cell multiple times to get different results!
gen_length = 100 # modify the length to your preference
prime_word = 'love' # name for starting the script

token_dict = {".":"Period", 
            ",": "Comma",
            "\"":"Quotation_Mark",
            ";": "Semicolon",
            "!":"Exclamation_mark",
            "?":"Question_mark",
            "(":"Left_Parentheses",
            ")":"Right_Parentheses",
            "-":"Dash",
            "\n":"Return"}

generated_script = generate(picked_rnn, vocab_to_int[prime_word], int_to_vocab, token_dict, vocab_to_int['love'], gen_length)
print(generated_script)

love , when love is all you need to love me Bright lips of the rain of mine Well , you 've collected you cry And I 'm amazed at the way you 're near Words of my life and mine you 'll be a part of mine . And if you 're next , I 've been thinking about you ) I 'm all alone ( I 'm gon na stick like glue Stick because I 'm stuck on you Hide in the kitchen , hide around his steed covered to wail That they 've swung us since we 've


In [41]:
# save script to a text file
song = open("generated_song_1.txt", "w")
song.write(generated_script)
song.close()