In [5]:
from __future__ import unicode_literals, print_function, division
from io import open
import unicodedata
import string
import re
import random
import pandas as pd
import nltk 
import copy
import os
import json
import numpy as np
import pdb

import torch
import torch.nn as nn
import torchtext.vocab as vocab
from torch.autograd import Variable
from torch import optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

%matplotlib inline

list_of_phonemes = ['AA','AE','AH','AO','AW','AY','B','CH','D','DH','EH','ER','EY','F','G', 'HH', 'IH', 'IY','JH','K','L','M','N','NG','OW','OY','P','R','S','SH','T','TH','UH','UW','V','W','Y','Z','ZH']
vowels=['AA','AE','AH','AO','AW','AY','EH','ER','EY','IH','IY','OW','OY','UH','UW','Y']
arpabet = nltk.corpus.cmudict.dict()
use_cuda = torch.cuda.is_available()

In [6]:
import time
import math


def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)


def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

In [7]:
# this function takes a pronunciation and returns a list of the vowels that appeared in it
def get_vowels(pronunciation):
    found_vowels = []
    for sound in pronunciation:
        if sound in vowels:
            found_vowels.append(sound)
    return found_vowels

def nonzero(thing):
    return (len(thing)>0)

In [8]:
glove = vocab.GloVe(name='6B', dim=100)

print('Loaded {} words'.format(len(glove.itos)))

.vector_cache/glove.6B.zip:   1%|          | 6.70M/862M [00:06<14:43, 969kB/s]    


KeyboardInterrupt: 

In [9]:
def get_word(word):
    return glove.vectors[glove.stoi[word]]

In [15]:
os.path.normpath(os.getcwd())

'/home/maximus/fall/seq2seq_raplyrics'

In [24]:
class LyricsDataFrame(Dataset):
    def __init__(self, root_dir, max_len):
        """
        Args:
            csv_file (string): Path to the csv file with annotations.
            root_dir (string): Directory with lyric files. 
        """
        
        self.phonemes2index = {0: "<SOS>", 1: "<EOS>", 2: "<PAD>"}
        self.phonemes2count = {}
        self.index2phonemes = {}
        
        self.words2index = {}
        self.words2count = {}
        self.index2words = {0: "<SOS>", 1: "<EOS>", 2: "<PAD>"}
        #self.words2prob = {}
        #self.words2probReverse = {}
        
        self.n_words = 3  # Count SOS and EOS
        self.n_phonemes = 3
        
        self.word2phoneme = {}
        
        self.max_len = max_len
        # takes the two files 'words_in_lyrics' and 'pho_dict' to construct
        # a dictionary that stores word to pronunciation conversions
        lst = [("<SOS>","<SOS>"),("<EOS>","<EOS>"),("<PAD>","<PAD>")]
        data_dir = os.path.normpath(os.getcwd() + "/data")
        words = [word.rstrip('\n') for word in open(data_dir + '/words_in_lyrics')]
        pronunciations = [word.rstrip('\n') for word in open(data_dir + '/pho_dict')]
        for ind in range(len(words)):
            lst.append((words[ind], pronunciations[ind].split()))
            
        self.word2pho = dict(lst)
        
        
        self.pairs = []
        # table is for removing odd characters
        table = str.maketrans('', '', string.punctuation)
        print("Reading lyrics...")
        error_counter = 0
        for file in os.listdir(data_dir + '/' + root_dir):
            print(file)
            with open(data_dir + '/lyric_files/' + file) as f:
                data = json.load(f)
                for song in data['songs']:
                    lines = [w.translate(table).lower() for w in song['lyrics'].split('\n')]
                    lines_filtered = [i for i in lines if len(i)>0]
                    lines = lines_filtered 
                    for ind in range(len(lines)-1):
                        line1 = lines[ind].split()
                        line2 = lines[ind+1].split()

                        #this gets rid of weird non-ascii characters like right quote and stuff like that 
                        line1 = [w.encode('ascii',errors='ignore').decode() for w in line1 if len(w.encode('ascii',errors='ignore').decode())>0]
                        line2 = [w.encode('ascii',errors='ignore').decode() for w in line2 if len(w.encode('ascii',errors='ignore').decode())>0]
                        #Add 'EOS' and 'BOS' tokens
                        try:
                            line1_vowels = get_vowels(self.word2pho[line1[-1]])
                            line2_vowels = get_vowels(self.word2pho[line2[-1]])

                            # this is a check to make sure all words in the line have glove embeddings
                            # also good bcus it'll get rid of weird words
                            for w in line1:
                                #get_word(w)
                                self.addWord(w)

                            #print(self.word2pho[line1[-1]])
                            for v in self.word2pho[line1[-1]]:
                                self.addPhoneme(v)

                            line1.append('<EOS>')
                            line2.append('<EOS>')
                            line1.insert(0, '<SOS>')
                            line2.insert(0, '<SOS>')

                            if line1_vowels[-1] == line2_vowels[-1] and \
                                len(line1) <= self.max_len and len(line1) <= self.max_len and \
                                line1 != line2:

                                while len(line1) < self.max_len:
                                    line1.append('<PAD>')
                                while len(line2) < self.max_len:
                                    line2.append('<PAD>')

                                self.pairs.append((line1, line2))
                        except:
                            break

        
    
    def addWord(self, word):
        if word not in self.words2index:
            self.words2index[word] = self.n_words
            self.words2count[word] = 1
            self.index2words[self.n_words] = word
            self.n_words += 1
        else:
            self.words2count[word] += 1
            
        
    def addPhoneme(self, phoneme):
        if phoneme not in self.phonemes2index:
            self.phonemes2index[phoneme] = self.n_phonemes
            self.phonemes2count[phoneme] = 1
            self.index2phonemes[self.n_phonemes] = phoneme
            self.n_phonemes += 1
        else:
            self.phonemes2count[phoneme] += 1
    
    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        return self.pairs[idx]

In [25]:
MAX_LEN = 35
lyrData = LyricsDataFrame('/lyric_files', MAX_LEN)
len(lyrData)

Reading lyrics...
Lyrics_Tyler,TheCreator.json
Lyrics_KanyeWest.json
Lyrics_TravisScott.json
Lyrics_NickiMinaj.json
Lyrics_Wu-TangClan.json
Lyrics_MacMiller.json
Lyrics_GhostfaceKillah.json
Lyrics_Common.json
Lyrics_Eminem.json
Lyrics_SnoopDogg.json
Lyrics_Nas.json
Lyrics_OutKast.json
Lyrics_N.W.A.json
Lyrics_2Pac.json
Lyrics_ChildishGambino.json
Lyrics_LilWayne.json
Lyrics_ChanceTheRapper.json
Lyrics_Migos.json
Lyrics_Drake.json
Lyrics_KidCudi.json
Lyrics_Lil_Kim.json
Lyrics_Pusha-T.json
Lyrics_Future.json
Lyrics_A$APRocky.json
Lyrics_TheRoots.json
Lyrics_TheNotoriousB.I.G..json
Lyrics_ATribeCalledQuest.json
Lyrics_VinceStaples.json
Lyrics_WizKhalifa.json
Lyrics_PlayboiCarti.json
Lyrics_DMX.json
Lyrics_MissyElliott.json
Lyrics_ScHoolboyQ.json
Lyrics_JAY-Z.json
Lyrics_BustaRhymes.json
Lyrics_CardiB.json
Lyrics_50Cent.json
Lyrics_KendrickLamar.json
Lyrics_DannyBrown.json
Lyrics_J.Cole.json


223223

In [26]:
random.choice(lyrData)

(['<SOS>',
  'little',
  'things',
  'hoes',
  'just',
  'be',
  'around',
  'always',
  'bickering',
  '<EOS>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>'],
 ['<SOS>',
  'i',
  'got',
  'some',
  'money',
  'threw',
  'some',
  'diamonds',
  'on',
  'a',
  'pinky',
  'ring',
  '<EOS>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>',
  '<PAD>'])

In [27]:
def indexesFromLine(lyr, line):
    l = []
    for word in line:
        l.append(lyr.words2index[word])
    return l


def variableFromLine(lyr, line):
    indexes = indexesFromLine(lyr, line)
    
    result = Variable(torch.LongTensor(indexes).view(-1, 1))
    if use_cuda:
        return result.cuda()
    else:
        return result

def variablesFromPair(pair):
    input_variable = variableFromLine(lyr, pair[0])
    target_variable = variableFromLine(lyr, pair[1])
    return (input_variable, target_variable)


def phonemeFromWord(lyr, word):
    l = []
    #print('word in pFw:', word)
    for pho in lyr.word2phoneme[word]:
        l.append(pho)

    indexes = [lyr.phonemes2index[pho] for pho in l]
    
    result = Variable(torch.LongTensor(indexes).view(-1, 1))
    if use_cuda:
        return result.cuda()
    else:
        return result

In [28]:
class phonemeOnlyEncoderRNN(nn.Module):
    def __init__(self, vocab_size, phoneme_vocab_size, phoneme_embedding_size, hidden_size):
        super(phonemeOnlyEncoderRNN, self).__init__()
        self.hidden_size = hidden_size
        
        self.phonemeEmbedding = nn.Embedding(phoneme_vocab_size, phoneme_embedding_size).cuda()
        self.phonemeLSTM = nn.LSTM(phoneme_embedding_size, hidden_size, num_layers=3).cuda()

        
    def forward(self, ipt, phoHidden):
        phonemeEmbedding = self.phonemeEmbedding(ipt).view(1, 1, -1)
        phonemeOutput, phonemeHidden = self.phonemeLSTM(phonemeEmbedding, phonemeHidden)
        return output, hidden 

    def initHidden(self):
        result = Variable(torch.zeros(2, 1, 1, self.hidden_size))
        if use_cuda:
            return result.cuda()
        else:
            return result

In [29]:
class phonemeOnlyDecoderRNN(nn.Module):
    def __init__(self, vocab_size, phoneme_vocab_size, phoneme_embedding_size, hidden_size, 
                     dropout_p=0.1, max_length=MAX_LEN):
        super(phonemeOnlyDecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = vocab_size
        self.dropout_p = dropout_p
        self.max_length = max_length
        self.phoneme_embedding_size = phoneme_embedding_size
        self.phoneme_vocab_size = phoneme_vocab_size
    
        self.phoneme_embedding = nn.Embedding(self.phoneme_vocab_size, self.phoneme_embedding_size).cuda()
        self.attn_phoneme = nn.Linear(self.hidden_size + self.phoneme_embedding_size, self.max_length*3).cuda()
        self.attn_phoneme_combine = nn.Linear(self.hidden_size*2, self.hidden_size).cuda()
        
        self.phonemeLSTM = nn.LSTM(self.hidden_size, self.hidden_size).cuda()
        self.phoneme_dropout = nn.Dropout(self.dropout_p).cuda()
        self.wordoutput = nn.Linear(self.hidden_size, vocab_size).cuda()

    
    def forward(self, ipt, pho_hidden, encoder_phoneme_outputs):
        # ipt will be a word
        # phoHidden is the first hidden state
        # take the word, convert to phonemes
        # take phonemes, starting with phoHidden, feed them into LSTM
        # make word prediction based on final output
        
        phoneme_hidden = pho_hidden
        
        for phoneme in phonemeFromWord(lyr, input):
            phoneme_embedding = self.phoneme_embedding(phoneme).view(1,1,-1)
            phoneme_embedding = self.dropout(phoneme_embedding)
            
            attn_phoneme_weights = F.softmax(
                self.attn_phoneme(torch.cat((phoneme_embedding[0], phoneme_hidden[0]), 1)), dim=1)
            
            attn_phoneme_applied = torch.bmm(attn_phoneme_weights.unsqueeze(0),
                                             encoder_phoneme_outputs.unsqueeze(0))
            
            phoneme_output = torch.cat((phoneme_embedding[0], attn_phoneme_applied[0]), 1)
            phoneme_output = self.attn_phoneme_combine(phoneme_output).unsqueeze(0) 
            phoneme_output = F.relu(phoneme_output)
            phoneme_output, phoneme_hidden = self.phonemeLSTM(phoneme_ouput, phoneme_hidden)
            
        
        output = F.relu(phoneme_output)
        output = self.word_ouput(output)
        output = F.log_softmax(output[0])
        return output, phoneme_hidden, attn_weights

    def initHidden(self):
        result = Variable(torch.zeros(3, 1, self.hidden_size))
        if use_cuda:
            return result.cuda()
        else:
            return result

In [30]:
teacher_forcing_ratio = 0.1
def trainBackwards(input_variable, target_variable, encoder, decoder, 
                   encoder_optimizer, decoder_optimizer, criterion):
    
    # input_variable: a sequence of words (to be converted to phonemes) that are fed into the encoder
    # target_variable: a sequence of words (which is ALREADY REVERSED and not turned into phonemes) that are fed into the decoder
    # encoder, decoder: the encoder + decoder
    
    lyric_data = lyrData #this is bad srry
    
    encoder_hidden = encoder.initHidden()

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()
    print(input_variable)
    print(target_variable)
    input_phonemes = [lyric_data.word2pho[w] for w in input_variable]
    #don't need to look at padding for the target variable, so just remove it
    target_variable = [x for x in target_variable if x != '<PAD>']
    
    #print(input_phonemes, target_variable)
    
    input_length = len(input_phonemes)
    target_length = len(target_variable)

    encoder_outputs = Variable(torch.zeros(max_length, encoder.hidden_size))
    encoder_outputs = encoder_outputs.cuda() if use_cuda else encoder_outputs
        
    loss = 0
    
    #phoneme_outputs = []
    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(
            input_phonemes[ei], encoder_hidden)
        encoder_outputs[ei] = encoder_output[0][0]
        #phoneme_outputs.append(encoder_ouput)
        
    #decoder_input = Variable(torch.LongTensor([[EOS_token]]))
    decoder_input = target_variable[0]
    decoder_hidden = encoder_hidden
    print(decoder_input)
    
    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False

    if use_teacher_forcing:
        # Teacher forcing: Feed the target as the next input
        for di in range(1, target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs_words, phoneme_outputs)
            loss += criterion(decoder_output, target_variable[di])
            decoder_input = target_variable[di]  # Teacher forcing

    else:
        # Without teacher forcing: use its own predictions as the next input
        for di in range(1, target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs_words, phoneme_outputs)
            topv, topi = decoder_output.data.topk(1)
            ni = topi[0][0]

            decoder_input = Variable(torch.LongTensor([[ni]]))
            decoder_input = decoder_input.cuda() if use_cuda else decoder_input

            loss += criterion(decoder_output, target_variable[di])
            if ni == SOS_token:
                break

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.data[0] / target_length

In [31]:
def trainIters(loader, encoder, decoder, n_epochs, print_every=500, plot_every=100, learning_rate=0.003):
    # takes in a loader, encoder, decoder, and # of epochs (runs thru the dataset)
    start = time.time()
    print_loss_total = 0  # Reset every print_every
    plot_loss_total = 0  # Reset every plot_every

    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
    
    criterion = nn.NLLLoss()
    for epoch in range(n_epochs):
        for i_batch, batch in loader:
            for pair in batch:
                print(pair)
                print(type(batch))
                input_variable = pair[0]
                target_variable = pair[1][::-1]
                
                loss = trainBackwards(input_variable, target_variable, encoder,
                             decoder, encoder_optimizer, decoder_optimizer, criterion)

                print_loss_total += loss
                plot_loss_total += loss

            if i_batch % print_every == 0:
                print_loss_avg = print_loss_total / print_every
                print_loss_total = 0
                print('%s (%d %d%%) %.4f' % (timeSince(start, iter / n_iters),
                                             iter, iter / n_iters * 100, print_loss_avg))

            if i_batch % plot_every == 0:
                plot_loss_avg = plot_loss_total / plot_every
                plot_losses.append(plot_loss_avg)
                total_plot_losses.append(plot_loss_avg)
                plot_loss_total = 0

    showPlot(plot_losses)

In [32]:
hidden_size = 1000
phoneme_embedding_size = 100
teacher_forcing_ratio = .5

encoder_1000_tfr_50 = phonemeOnlyEncoderRNN(lyrData.n_words, lyrData.n_phonemes,
                                        phoneme_embedding_size, hidden_size)
decoder_1000_tfr_50_do_20 = phonemeOnlyDecoderRNN(lyrData.n_words, lyrData.n_phonemes, 
                                                  phoneme_embedding_size, hidden_size, dropout_p=0.2)
dataloader = DataLoader(lyrData, batch_size=20, shuffle=True, num_workers=0)

trainIters(dataloader, encoder_1000_tfr_50, decoder_1000_tfr_50_do_20, 25, print_every=1000)

('<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>', '<SOS>')
<class 'list'>
<SOS>
>SOS<


KeyError: '<'

In [None]:
# encoder_512_tfr_50.load_state_dict(torch.load('encoder_512_tfr_50.pt'))
# decoder_512_tfr_50_do_20.load_state_dict(torch.load('decoder_512_tfr_50_do_20.pt'))

In [None]:
# TODO
# 1. get glove working
# 2. make sure network works right
# 3. order training short to long <--- might be kinda hard
# 4. evaluateAndShowAttention doesn't use beam_search
# 5. use both attentions?
# Stack phoneme lstm?
# stack other lstm? dont get how to increase layers of lstm

In [None]:
def evaluate(encoder, decoder, sentence, max_length=MAX_LEN):
    input_variable = variableFromLine(lyr, sentence)
    input_length = input_variable.size()[0]
    encoder_hidden = encoder.initHidden()

    encoder_outputs = Variable(torch.zeros(max_length, encoder.hidden_size))
    encoder_outputs = encoder_outputs.cuda() if use_cuda else encoder_outputs

    
    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(input_variable[ei],
                                                 encoder_hidden)
        encoder_outputs[ei] = encoder_outputs[ei] + encoder_output[0][0]

    
    decoder_input = Variable(torch.LongTensor([[EOS_token]]))  # EOS
    decoder_input = decoder_input.cuda() if use_cuda else decoder_input

    decoder_hidden = encoder_hidden

    decoded_words = []
    decoder_attentions = torch.zeros(max_length, max_length)

    for di in range(max_length):
        decoder_output, decoder_hidden, decoder_attention = decoder(
            decoder_input, decoder_hidden, encoder_outputs)
        decoder_attentions[di] = decoder_attention.data
        #print(decoder_attention)
        topv, topi = decoder_output.data.topk(1)
        ni = topi[0][0].cpu()
        ni = ni.numpy()
        if ni == SOS_token:
            decoded_words.append('<SOS>')
            break
        else:
            #print(decoder_output.data.topk(1))
            #print(int(ni))
            decoded_words.append(lyr.index2phonemes[int(ni)])

        decoder_input = Variable(torch.LongTensor([[int(ni)]]))
        decoder_input = decoder_input.cuda() if use_cuda else decoder_input

    return decoded_words, decoder_attentions[:di + 1]

In [None]:
def showAttention(input_sentence, output_words, attentions):
    # Set up figure with colorbar
    fig = plt.figure()
    ax = fig.add_subplot(111)
    cax = ax.matshow(attentions.numpy(), cmap='bone')
    fig.colorbar(cax)

    # Set up axes
    ax.set_xticklabels([''] + input_sentence +
                       ['<EOS>'], rotation=90)
    ax.set_yticklabels([''] + output_words)

    # Show label at every tick
    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))
    plt.rcParams['figure.figsize'] = [10, 5]
    plt.show()


def evaluateAndShowAttention(input_sentence, encoder, decoder):
    output_words, attentions = evaluate(encoder, decoder, input_sentence)
    print(output_words)
    print("WARNING: Attentions graph does NOT use beam_search, meaning predictions will be severely worsened")

    showAttention(input_sentence, output_words[::-1], attentions)
    return output_words

In [None]:
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np


def showPlot(points):
    plt.figure()
    fig, ax = plt.subplots()
    # this locator puts ticks at regular intervals
    loc = ticker.MultipleLocator(base=0.2)
    ax.yaxis.set_major_locator(loc)
    plt.plot(points)

In [None]:
def evaluateRandomly(encoder, decoder, n=10):
    for i in range(n):
        pair = random.choice(pairs)
        pair_as_text = normal_pairs[pairs.index(pair)]
        print('>', pair[0])
        print('>', pair_as_text[0])
        print('=', pair[1])
        print('=', pair_as_text[1])
        phos = evaluateBeamSearch(encoder, decoder, pair[0])
        output_sentence = ' '.join(phos)
        print('<', output_sentence)
        print('')

In [None]:
teacher_forcing_ratio = 0.1
def trainAttentionBackwards(input_variable, target_variable, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=MAX_LEN):
    encoder_hidden = encoder.initHidden()

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    input_length = input_variable.size()[0]
    target_length = target_variable.size()[0]

    encoder_outputs = Variable(torch.zeros(max_length, encoder.hidden_size))
    encoder_outputs = encoder_outputs.cuda() if use_cuda else encoder_outputs

    loss = 0
    
    for ei in range(input_length):
        #print(input_variable[ei])
        encoder_output, encoder_hidden = encoder(
            input_variable[ei], encoder_hidden)
        encoder_outputs[ei] = encoder_output[0][0]
        
    #print(input_length)
    #print(encoder_outputs.size())
    #print(encoder_outputs)
        
    # first input is EOS because we start predicting from the end of the sentence
    decoder_input = Variable(torch.LongTensor([[EOS_token]]))
    decoder_input = decoder_input.cuda() if use_cuda else decoder_input
    
    decoder_hidden = encoder_hidden

    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False

    if use_teacher_forcing:
        # Teacher forcing: Feed the target as the next input
        for di in range(1, target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            loss += criterion(decoder_output, target_variable[di])
            decoder_input = target_variable[di]  # Teacher forcing

    else:
        # Without teacher forcing: use its own predictions as the next input
        for di in range(1, target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            topv, topi = decoder_output.data.topk(1)
            ni = topi[0][0]

            decoder_input = Variable(torch.LongTensor([[ni]]))
            decoder_input = decoder_input.cuda() if use_cuda else decoder_input

            loss += criterion(decoder_output, target_variable[di])
            if ni == SOS_token:
                break

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.data[0] / target_length

In [None]:
class CombinedEncoderRNN(nn.Module):
    def __init__(self, vocab_size, phoneme_vocab_size, word_embedding_size, phoneme_embedding_size, hidden_size):
        super(CombinedEncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        # add if w/ cuda
        # self.gloveEmbedding = nn.Embedding(vocab_size, word_embedding_size).cuda()
        
        self.phonemeEmbedding = nn.Embedding(phoneme_vocab_size, phoneme_embedding_size).cuda()
        self.phonemeLSTM = nn.LSTM(phoneme_embedding_size, hidden_size, num_layers=1).cuda()
        self.phonemeLinear = nn.Linear(hidden_size, word_embedding_size).cuda() #changes size to match up with glove embedding
        
        self.scaleVector = nn.Parameter(torch.rand(word_embedding_size)).cuda()
        self.b = nn.Parameter(torch.rand(1)).cuda()
        self.scaleActivation = nn.Sigmoid()
        
        self.wordLSTM = nn.LSTM(word_embedding_size, hidden_size, num_layers=1).cuda()
        
        
    def forward(self, ipt, phoHidden):

        phonemeHidden = phoHidden
        for phoneme in phonemeFromWord(lyr, ipt):
            phonemeEmbedding = self.phonemeEmbedding(phoneme).view(1, 1, -1)
            phonemeOutput, phonemeHidden = self.phonemeLSTM(phonemeEmbedding, phonemeHidden)
            phonemeHiddenList.append(phonemeHidden)
            
        xchar = self.phonemeLinear(phonemeHidden[0])
        
        scale = self.scaleActivation(torch.dot(self.scaleVector.view(-1).float(), xchar.view(-1)) + self.b)
            
        lstmInput = (1-scale)*glove + scale*xchar
        output, hidden = self.wordLSTM(lstmInput, wordHidden)
        return output, hidden, phonemeHiddenList 

    def initHidden(self):
        result = Variable(torch.zeros(2, 1, 1, self.hidden_size))
        if use_cuda:
            return result.cuda()
        else:
            return result

In [None]:
class CombinedAttnDecoderRNN(nn.Module):
    def __init__(self, vocab_size, phoneme_vocab_size, word_embedding_size, phoneme_embedding_size, hidden_size, 
                     dropout_p=0.1, max_length=MAX_LEN):
        super(CombinedAttnDecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = vocab_size
        self.dropout_p = dropout_p
        
        self.max_length = max_length
        
        self.phoneme_embedding_size = phoneme_embedding_size
        self.phoneme_vocab_size = phoneme_vocab_size
        
        
        #self.gloveEmbedding = nn.Embedding(vocab_size, word_embedding_size).cuda()
        
        self.attn_word = nn.Linear(self.hidden_size * 2, self.max_length).cuda()
        self.attn_word_combine = nn.Linear(self.hidden_size * 2, self.hidden_size).cuda()
        
        self.phoneme_embedding = nn.Embedding(self.phoneme_vocab_size, self.phoneme_embedding_size).cuda()
        self.attn_phoneme = nn.Linear(self.hidden_size + self.phoneme_embedding_size, self.max_length*3).cuda()
        self.attn_phoneme_combine = nn.Linear(self.hidden_size*2, self.hidden_size).cuda()
        
        self.phonemeLSTM = nn.LSTM(self.hidden_size, self.hidden_size).cuda()
        
        self.word_dropout = nn.Dropout(self.dropout_p).cuda()
        self.phoneme_dropout = nn.Dropout(self.dropout_p).cuda()
        
        self.combinedLSTM = nn.LSTM(self.hidden_size, self.hidden_size).cuda()
        self.out = nn.Linear(self.hidden_size, self.output_size).cuda()


        #input should be a word so the get_word works right
    def forward(self, input, combinedHidden, phoHidden, encoder_word_outputs, encoder_phoneme_outputs):
        
        phoneme_hidden = phoHidden
        
        for phoneme in phonemeFromWord(lyr, input):
            phoneme_embedding = self.phoneme_embedding(phoneme).view(1,1,-1)
            phoneme_embedding = self.dropout(phoneme_embedding)
            
            attn_phoneme_weights = F.softmax(
                self.attn_phoneme(torch.cat((phoneme_embedding[0], phoneme_hidden[0]), 1)), dim=1)
            
            attn_phoneme_applied = torch.bmm(attn_phoneme_weights.unsqueeze(0),
                                             encoder_phoneme_outputs.unsqueeze(0))
            
            phoneme_output = torch.cat((phoneme_embedding[0], attn_phoneme_applied[0]), 1)
            phoneme_output = self.attn_phoneme_combine(phoneme_output).unsqueeze(0) 
            phoneme_output = F.relu(phoneme_output)
            phoneme_output, phoneme_hidden = self.phonemeLSTM(phoneme_ouput, phoneme_hidden)
            
            
        word_embedding = get_word(input)
        word_embedding = self.word_dropout(word_embedding)
        
        attn_weights = F.softmax(
            self.attn(torch.cat((word_embedding[0], combinedHidden[0]), 1)), dim=1)
        attn_applied = torch.bmm(attn_weights.unsqueeze(0),
                                 encoder_outputs.unsqueeze(0))

        output = torch.cat((embedded[0], attn_applied[0]), 1)
        output = self.attn_word_combine(output).unsqueeze(0)

        output = F.relu(output)
        output, hidden = self.combinedLSTM(output, combinedHidden)

        output = F.log_softmax(self.out(output[0]), dim=1)
        return output, hidden, attn_weights

    def initHidden(self):
        result = Variable(torch.zeros(3, 1, self.hidden_size))
        if use_cuda:
            return result.cuda()
        else:
            return result

In [None]:
class AttnDecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LEN):
        super(AttnDecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout_p = dropout_p
        self.max_length = max_length

        self.embedding = nn.Embedding(self.output_size, self.hidden_size).cuda()
        self.attn = nn.Linear(self.hidden_size * 2, self.max_length).cuda()
        self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size).cuda()
        self.dropout = nn.Dropout(self.dropout_p).cuda()
        self.gru = nn.GRU(self.hidden_size, self.hidden_size).cuda()
        self.out = nn.Linear(self.hidden_size, self.output_size).cuda()

    def forward(self, input, hidden, encoder_outputs):
        embedded = self.embedding(input).view(1, 1, -1)
        embedded = self.dropout(embedded)

        attn_weights = F.softmax(
            self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
        attn_applied = torch.bmm(attn_weights.unsqueeze(0),
                                 encoder_outputs.unsqueeze(0))

        output = torch.cat((embedded[0], attn_applied[0]), 1)
        output = self.attn_combine(output).unsqueeze(0)

        output = F.relu(output)
        output, hidden = self.gru(output, hidden)

        output = F.log_softmax(self.out(output[0]), dim=1)
        return output, hidden, attn_weights

    def initHidden(self):
        result = Variable(torch.zeros(1, 1, self.hidden_size))
        if use_cuda:
            return result.cuda()
        else:
            return result

In [None]:
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        # add if w/ cuda
        self.embedding = nn.Embedding(input_size, hidden_size).cuda()
        self.gru = nn.GRU(hidden_size, hidden_size).cuda()

    def forward(self, input, hidden):
        embedded = self.embedding(input).view(1, 1, -1)
        output = embedded
        output, hidden = self.gru(output, hidden)
        return output, hidden

    def initHidden(self):
        result = Variable(torch.zeros(1, 1, self.hidden_size))
        if use_cuda:
            return result.cuda()
        else:
            return result

In [None]:
beam_size = 5
def evaluateBeamSearch(encoder, decoder, line, reverse=True, max_length=MAX_LEN):
    input_variable = variableFromLine(lyr, line)
    input_length = input_variable.size()[0]
    encoder_hidden = encoder.initHidden()

    encoder_outputs = Variable(torch.zeros(max_length, encoder.hidden_size))
    encoder_outputs = encoder_outputs.cuda() if use_cuda else encoder_outputs
    
    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(input_variable[ei], 
                                                 encoder_hidden)
        encoder_outputs[ei] = encoder_outputs[ei] + encoder_output[0][0]

    token = EOS_token if reverse else SOS_token
    decoder_input = Variable(torch.LongTensor([[token]]))  # SOS or EOS
    decoder_input = decoder_input.cuda() if use_cuda else decoder_input

    decoder_hidden = encoder_hidden

    decoded_words = [] 
    decoder_attentions = torch.zeros(max_length, max_length)

    
    beam = beam_search(beam_size, encoder, decoder, line, decoder_input, decoder_hidden, encoder_outputs, reverse=True)
    beam_list = []
    for b in beam.pho:
        beam_list.append(lyr.index2phonemes[b.tolist()])
        
    if reverse:
        beam_list = beam_list[::-1]
    #print(beam.attentions)
    return beam_list

In [None]:
def beam_search(beam_size, encoder, decoder, line, first_input, first_hidden, encoder_outputs, old_beams=None, reverse=False):
    #print('using beam search...')
    input_variable = variableFromLine(lyr, line)
    
    # Get initial decoder outputs. The input is the not up for debate, so it starts every beam as well.
    dec_out, dec_hidden, dec_attention = decoder(
        first_input, first_hidden, encoder_outputs)
    
    
    #This will start off all of our beams.
    dec_hidden_start = dec_hidden
    
    # take out the predictions, these are our beams
    proposed_v, proposed_i = dec_out.data.topk(beam_size)

    #convert the indices to list of lists w/ one item
    proposed_i = [x for x in proposed_i[0]]
    proposed_v = [x for x in proposed_v[0]]
    
    
    if(old_beams is not None):
        beams = old_beams
    else:
        beams = []    
        for i in range(beam_size):
            beam = Beam(beam_size)

            beam.pho.append(proposed_i[i])
            beam.prob.append(proposed_v[i])
            beam.update_prob(reverse)

            beams.append(beam)
            
    #this for loop should go until all beams are EOS
    beams_finished = False
    count = 0
    while not beams_finished:
        count += 1
        extended_beams = []
        
        for j in range(len(beams)):
            extended_beams.append(beams[j].extend_beams(beam_size, encoder, decoder, dec_hidden_start, encoder_outputs, reverse))

        # we get the extended beams in lists of 5, so now extended beams is a matrix.
        # we flatten it to find the highest value easier.

        flat_list = []
        for sublist in extended_beams:
            for item in sublist:
                flat_list.append(item)
                
        flat_list = sorted(flat_list, key=lambda beam: beam.total_sum, reverse=True)
            
        beams = flat_list[:beam_size]
        
        # infinite loop, something went wrong.
        if count > 31:
            return beams[0]
            
        for beam in beams:
            if reverse:
                prediction_end = SOS_token
            else: 
                prediction_end = EOS_token
                
            if beam.pho[-1] == prediction_end or len(beam.pho) > MAX_LEN:
                beams_finished = True
                #print('success')
                return beam
                
        #print('On search %d:' % count)
        #for b in beams:
            #print(b.pho, b.total_sum)
            
    return beams


    #       When expanding this beam, check if it's valid. 
    #       if valid is false and the final pho isn't EOS, 
    #            dont expand the beam. 
    #       if valid is false and the final pho is EOS, don't expand but keep

In [None]:
class Beam(object):
    
    def __init__(self, beam_width):
        self.prob = []
        self.pho = []
        self.total_sum = 0
        self.valid = True
        self.beam_width = beam_width
        
        #this will create an error later
        self.attentions = torch.zeros(MAX_LEN, MAX_LEN)

        
    def extend_beams(self, beam_width, encoder, decoder, first_hidden, encoder_outputs, reverse):
        if reverse:
            token = SOS_token
        else:
            token = EOS_token
            
        if self.pho[-1] == token or self.valid == False:
            #print('reached the end of a beam, either it is invalid or the last phoneme is the signal to stop predictions')
            return[self]
            
        guess_hidden = first_hidden
        
        #first, run the phonemes of the beam thru the decoder, using teacher forcing the whole way
        for phoneme_index in self.pho:
            dec_input =  Variable(torch.LongTensor([[phoneme_index]]))
            dec_input = dec_input.cuda() if use_cuda else dec_input

            guess_out, guess_hidden, guess_attention = decoder(
                dec_input, guess_hidden, encoder_outputs)
            
            ind = self.pho.index(phoneme_index)
            self.attentions[ind] = guess_attention.data
            

        # second, take the top beam_size predictions of the final out and put them in new beams
        guess_v, guess_i = guess_out.topk(beam_size)

        guess_i = [x for x in guess_i[0]]
        guess_v = [x for x in guess_v[0]]

        extended_beams = []
        
        for i in range(beam_width):
            
            new_beam = Beam(beam_width)
            for n in self.pho:
                new_beam.pho.append(n)
                
            for p in self.prob:
                new_beam.prob.append(p)

            new_beam.pho.append(guess_i[i].data[0])
            new_beam.prob.append(guess_v[i].data[0]) 
            new_beam.update_prob(reverse)
            
            extended_beams.append(new_beam)

        
        #return the extended beams
        return extended_beams


    
    def update_prob(self, reverse=False):
        s = 0
        for p in self.prob:
             s += p     
        if len(self.pho) > 1:
            
            # [0, 1]
            # prev = 0
            # cur = 1
            for i in range(1, len(self.pho)):
                prev = self.pho[i-1]
                cur = self.pho[i]
                
                if reverse:
                    try:
                        s += math.log(lyr.getCondProbReverse(cur.tolist(), prev.tolist()))
                    except:
                        self.valid = False
                else:
                    # P(2|1)
                    s += math.log(lyr.getCondProb(cur, prev))

                
                
        self.total_sum = s/len(self.prob)
        

In [None]:
good_pairs = [2030,8350,3271,3013,13575,371]
for p in good_pairs:
    pair = normal_pairs[p]
    print(pair)
    print(normal_pairs.index(pair))

    l = []
    for w in clean_line(convert_to_phonemes(pair[0]), []):
        for s in w[0]:
            l.append(s)

    out = evaluateAndShowAttention(l, encoder_512_tfr_50, decoder_512_tfr_50_do_20)
    print(out.reverse())
    
evaluateRandomly(encoder_512_tfr_50, decoder_512_tfr_50_do_20)

In [None]:
# this function takes a sound and a line and returns the number of times that sound appears in that line. 
def traverseLineForMatches(sound, line):
    count = 0
    for word in line:
        try:
            for pronunciation in word:
                #print(pronunciation)
                if sound in pronunciation:
                    #print('found a matching sound!')
                    count += 1
        except:
            print('Something went wrong. Skipping this bit....')
    #print(sound)
    return count


# this takes a word (a string or a unicode) and returns the nltk pronunciations without stress numbers
# and without duplicate pronunciations. it does NOT choose the best pronunciation from the remaining
# list of unique pronunciations for that word

def wordWithoutNum(word):
    #word = word.decode('utf-8').lower()
    s = arpabet[word] #s is a list of lists
    stripped_s = []
    stripped_s_final = []
    for pronunciation in s: #pronunciation is a list of unicode strings
        stripped_p = []
        for sound in pronunciation: #for every sound, remove digits from the str
            stripped_sound = ''.join([i for i in sound if not i.isdigit()])
            stripped_p.append(stripped_sound)
        stripped_s.append(stripped_p)
        
    #sometimes removing the numbers creates duplicates, for example:
    # arpabet['the'] = [[u'DH', u'AH0'], [u'DH', u'AH1'], [u'DH', u'IY0']] 
    # after removing digits, we want [[u'DH', u'AH'], [u'DH', u'IY']]
    # The following for loop performs this removal
    
    for pro in stripped_s: 
        if pro not in stripped_s_final:
            stripped_s_final.append(pro)
                
    return stripped_s_final



# this function will ideally take a line and a previous line, where line is an uncleaned list of lists of phonemes,
# and prev_line is a cleaned line of phonemes,
# and returns the first arguement with no words with multiple pronunciations

def clean_line(line, prev_line): 

    cleaned_line = []
    line_as_pure_phonemes = [item for word in line for item in word]
    line_as_pure_phonemes = [sound for word in line_as_pure_phonemes for sound in word]
    # look at every word in the line
    # Line looks like this 
#     [[[u'IH', u'N']],     In
#     [[u'DH', u'AH'], [u'DH', u'IY'], [u'TH', u'AH'], [u'TH', u'IY']], the
#     [[u'M', u'IH', u'S', u'T']], mist 
#     [[u'DH', u'OW']], though
#     [[u'B', u'AH', u'T']], but
#     [[u'DH', u'AH'], [u'DH', u'IY'], [u'TH', u'AH'], [u'TH', u'IY']], the
#     [[u'R', u'IH', u'TH', u'S']], ryth's
#     [[u'M', u'UW', u'V']], move
#     [[u'IH', u'N']]] in
    
    
    
    # word looks like this, here's "the": [[u'DH', u'AH'], [u'DH', u'IY'], [u'TH', u'AH'], [u'TH', u'IY']] 
    for word in line:
        
        pronunciation_vowels = []
        pronunciation_vowels_scores = []
        
        
        
        if len(word) != 1:
            #print(word)
            # pronunciation looks like this: [u'DH', u'AH'], so word[0] = u'DU'
            for pronunciation in word:
                #Get the vowels for the pronunciations
                pronunciation_vowels.append(get_vowels(pronunciation))
                
            
            # v is a list of vowels, could be one but maybe more. is a list
            for v in pronunciation_vowels:
                count = 0
                # for each sound in this v, tally the number of times it appears in the unfiltered list
                for sound in v:
                    count += line_as_pure_phonemes.count(sound)
                    count += prev_line.count(sound)
                    
                count = count / float(len(v)) #take the average, in case there are 3 vowels in on pro, and 2 in the other
                pronunciation_vowels_scores.append(count)
               
            #find which vowel had the highest count
            max_score = max(pronunciation_vowels_scores)

            #get the location of the highest count vowel
            max_score_index = pronunciation_vowels_scores.index(max_score)
            
            max_score_vowel = pronunciation_vowels[max_score_index]
            cleaned_line.append([word[max_score_index]])
            #print(pronunciation_vowels)
            #print(pronunciation_vowels_scores)
            
        # otherwise, only one pronunciation, so append it to the list
        else:
            cleaned_line.append(word)

    return cleaned_line

#this is a sample starting line
#prev_line = remove_lists(convert_to_phonemes("ten after one i think i'll hop the horse"))
#clean_line(convert_to_phonemes("hangin' in the good day feelin' good"), prev_line)
# print(convert_to_phonemes("a whitened sandwich and again it stopped"))
# print(convert_to_phonemes("a derelick makes a real long speech"))
# print(convert_to_phonemes("in the mist though but the rhyth's move in"))

In [None]:
def convert_to_phonemes(s):
    #filename = 'testfile'
    #global arpabet
    #filename = os.path.join("home", "m3","nltk_data", "corpora", "cmudict", "cmudict")
    #print(os.path.abspath(filename))
    #filename = "/home/m3/nltk_data/corpora/cmudict/cmudict"
    s = s.split(' ')
    line_phonemes = []
    word_phonemes = []
    pure = True
    
    for word in s:
        try:
            line_phonemes.append(wordWithoutNum(word))
        except:
            if word[-3:] == "in'":
                word_in = word
                word = word[:-3] + 'ing'
                ing_as_phonemes = convert_to_phonemes(word) 
                print(ing_as_phonemes)
                for pro in ing_as_phonemes[0]:
                    #print(pro)
                    pro[-1] = u'N'
                line_phonemes.append(ing_as_phonemes)
                #with open(filename,"a") as f:
                    #print("ing_as_phonemes:", ing_as_phonemes)
                    #print("ing_as_phonemes[0]:", ing_as_phonemes[0])
                    #print("ing_as_phonemes[0][0]:", ing_as_phonemes[0][0])
                    #print(word_in.upper() + " 1 " + " ".join(map(str, ing_as_phonemes[0][0])))
                    #f.write(word_in.upper() + " 1 " + " ".join(map(str, ing_as_phonemes[0][0])))
                    #f.write("\n")
                #arpabet = nltk.corpus.cmudict.dict()
            elif "'" in word:
                try:
                    line_phonemes.append(wordWithoutNum(word.replace("'","")))
                except:
                    pass
                    #print('Deleting an apostrophe didn\'t work')
            elif word == '':
                pass
            else:
                pure = False
    return line_phonemes
# print(convert_to_phonemes("hangin' in the good day feelin' good"))
# print(convert_to_phonemes("a whitened sandwich and again it stopped"))
# print(convert_to_phonemes("a derelick makes a real long speech"))
#l = convert_to_phonemes("after hours 'it' was cool")
#l2 = convert_to_phonemes("ten after one i think i'll hop the horse")

In [None]:
total_plot_losses = []
def trainItersAttentionBackwards(encoder, decoder, n_iters, plot_losses, print_every=1000, plot_every=100, learning_rate=0.01):
    start = time.time()
    print_loss_total = 0  # Reset every print_every
    plot_loss_total = 0  # Reset every plot_every

    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
    training_pairs = [variablesFromPair(random.choice(pairs))
                      for i in range(n_iters)]
    criterion = nn.NLLLoss()

    for iter in range(1, n_iters + 1):
        training_pair = training_pairs[iter - 1]
        input_variable = training_pair[0]
        target_variable = training_pair[1]
        
        #print(target_variable)
        
        
        # create inverted indices
        idx = [i for i in range(target_variable.size(0)-1, -1, -1)]
        idx = Variable(torch.cuda.LongTensor(idx))
        #print(idx)
        inverted_tensor = target_variable.index_select(0, idx)
        #print(inverted_tensor)
        reversed_list = []
        for r in range(len(target_variable.data)):
            reversed_list.append(target_variable.data[(len(target_variable.data)-1)-r][0])
        
        target_variable = inverted_tensor

        loss = trainAttentionBackwards(input_variable, target_variable, encoder,
                     decoder, encoder_optimizer, decoder_optimizer, criterion)
        print_loss_total += loss
        plot_loss_total += loss

        if iter % print_every == 0:
            #torch.save(encoder.state_dict(), 'encoder_128_tfr_80.pt')
            #torch.save(decoder.state_dict(), 'decoder_128_tfr_80_do_50.pt')
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            print('%s (%d %d%%) %.4f' % (timeSince(start, iter / n_iters),
                                         iter, iter / n_iters * 100, print_loss_avg))

        if iter % plot_every == 0:
            plot_loss_avg = plot_loss_total / plot_every
            plot_losses.append(plot_loss_avg)
            total_plot_losses.append(plot_loss_avg)
            plot_loss_total = 0

    showPlot(plot_losses)

In [None]:
MAX_LENGTH=30
def prepareData(name, reverse=False):
    # read in all files. Check if each line pair of every song of every artist rhymes
    # if it rhymes, put it 
    
    lyr, pairs = readLyrics(name, reverse)
    print("Reading %s sentence pairs" % len(pairs))
    ## CONVERT TO PHONEMES HERE
    pairs_as_phonemes = []
    pairs_not_as_phonemes = []
    print("Counting words...")
    num_of_pairs = 0
    for pair in pairs:
        cur_phonemes = []
        cur_phonemes.append(convert_to_phonemes(pair[0]))
        cur_phonemes.append(convert_to_phonemes(pair[1]))
        cur_phonemes[0] = clean_line(cur_phonemes[0], cur_phonemes[1])
        cur_phonemes[1] = clean_line(cur_phonemes[1], cur_phonemes[0])
        
        cur_phonemes[0] = [sound for sublist in cur_phonemes[0] for sound in sublist]
        cur_phonemes[1] = [sound for sublist in cur_phonemes[1] for sound in sublist]
        
        foo = []
        for w in cur_phonemes[0]:
            for s in w:
                if s in list_of_phonemes:
                    foo.append(s)
                
        cur_phonemes[0] = foo
        
        foo = []
        for w in cur_phonemes[1]:
            for s in w:
                if s in list_of_phonemes:
                    foo.append(s)
                
        cur_phonemes[1] = foo
        if len(cur_phonemes[0]) < MAX_LENGTH and len(cur_phonemes[1]) < MAX_LENGTH:
            num_of_pairs += 1
            pairs_as_phonemes.append(cur_phonemes)
            pairs_not_as_phonemes.append(pair)
            lyr.addLine(cur_phonemes[0])
            lyr.addLine(cur_phonemes[1])
        
    print("Pairs under ", MAX_LENGTH)
    print(num_of_pairs)
    
    return lyr, pairs_as_phonemes, pairs_not_as_phonemes


lyr, pairs, normal_pairs = prepareData('rap')
print('done')

In [None]:
def readLyrics(name):
    
    lyr = Lyrics(name)
    
    print('Making Phoneme Dictionary....')
    
    # These files 
    words = [word.rstrip('\n') for word in open('words_in_lyrics')]
    pronunciations = [word.rstrip('\n') for word in open('pho_dict')]
    
    for ind in range(len(words)):
        pro = pronunciations[ind].split()
        lyr.addWord2Phoneme(words[ind], pro)
        lyr.addWord(words[ind])
        
    
    pairs = []
    table = str.maketrans('', '', string.punctuation)
    print("Reading lyrics...")
    error_counter = 0
    pair_counter = 0
    for file in os.listdir('lyric_files'):
        print(file)
        with open('lyric_files/' +file) as f:
            data = json.load(f)
            for song in data['songs']:
                lines = [w.translate(table).lower() for w in song['lyrics'].split('\n')]
                lines_filtered = [i for i in lines if len(i)>0]
                lines = lines_filtered #is this needed?
                for ind in range(len(lines)-1):
                    line1 = lines[ind].split()
                    line2 = lines[ind+1].split()
                    
                    #this gets rid of weird non-ascii characters like right quote and stuff like that 
                    line1_filtered = [w.encode('ascii',errors='ignore').decode() for w in line1 if len(w.encode('ascii',errors='ignore').decode())>0]
                    line2_filtered = [w.encode('ascii',errors='ignore').decode() for w in line2 if len(w.encode('ascii',errors='ignore').decode())>0]

                    try:
                        line1_vowels = get_vowels(lyr.word2phoneme[line1_filtered[-1]])
                        line2_vowels = get_vowels(lyr.word2phoneme[line2_filtered[-1]])

                        # this is a check to make sure all words in the line have glove embeddings
                        # I think this is also good bcus it'll get rid of really weird words.... might lose some stuff
                        for w in line1_filtered:
                            get_word(w)

                        for v in lyr.word2phoneme[line1_filtered[-1]]:
                            lyr.addPhoneme(v)

                        if line1_vowels[-1] == line2_vowels[-1]:
                            pairs.append((line1_filtered, line2_filtered))
                            pair_counter += 1
                    except:
                        #print("Some error occurred here:")
                        #print(line1_filtered, line1_filtered)
                        error_counter +=1
                        
    print(error_counter, pair_counter)

            
    return lyr, pairs
lyr, pairs = readLyrics('rap')
MAX_LENGTH = 30

In [None]:
SOS_token = 0
EOS_token = 1
PAD_token = 2

class Lyrics:
    def __init__(self, name):
        self.name = name
        self.phonemes2index = {}
        self.phonemes2count = {}
        self.index2phonemes = {}
        self.phonemes2prob = {}
        self.phonemes2probReverse = {}
        
        self.words2index = {}
        self.words2count = {}
        self.index2words = {0: "<SOS>", 1: "<EOS>", 2: "<PAD>"}
        self.words2prob = {}
        self.words2probReverse = {}
        
        self.n_words = 2  # Count SOS and EOS
        self.n_phonemes = 0
        
        self.word2phoneme = {}
        
        
    def addWord2Phoneme(self, word, phonemes):
        self.word2phoneme[word] = phonemes

    # Takes a line of phonemes and adds it
    def addLine(self, line):
        for word in line:
            self.addWord(word)
            
        
        for i in range(len(line)):
            if i+1 < len(line):
                self.addProb(line[i], line[i+1])
            if i-1 >= 0:
                self.addProbReverse(line[i-1], line[i])
            
    def addWord(self, word):
        if word not in self.words2index:
            self.words2index[word] = self.n_words
            self.words2count[word] = 1
            self.index2words[self.n_words] = word
            self.n_words += 1
        else:
            self.words2count[word] += 1
            
        
    def addPhoneme(self, phoneme):
        if phoneme not in self.phonemes2index:
            self.phonemes2index[phoneme] = self.n_phonemes
            self.phonemes2count[phoneme] = 1
            self.index2phonemes[self.n_phonemes] = phoneme
            self.n_phonemes += 1
        else:
            self.phonemes2count[phoneme] += 1
            
            
    # Add a counter for P(second|first)
    def addProb(self, first, second):
        if first not in self.phonemes2prob:
            self.phonemes2prob[first] = np.zeros(41)
            self.phonemes2prob[first][self.phonemes2index[second]] += 1
        else:
            self.phonemes2prob[first][self.phonemes2index[second]] += 1
        
        
    # first should be the first phoneme in the sentence when it's read as it would be spoken
    # [u'K', u'AE', u'N', u'AY', u'K', u'IH', u'K', u'IH', u'T'] -->  K is first, AE is 2nd
    # Add a counter for P(first|second)
    def addProbReverse(self, first, second):
        if second not in self.phonemes2probReverse:
            self.phonemes2probReverse[second] = np.zeros(41)
            self.phonemes2probReverse[second][self.phonemes2index[first]] += 1
        else:
            self.phonemes2probReverse[second][self.phonemes2index[first]] += 1
            
            
    # P(2|1)
    def getCondProb(self, first, second):
        f = self.index2phonemes[first]
        s = self.index2phonemes[second]
        return self.phonemes2prob[f][s]/self.phonemes2count[f]
    
    # P(1|2)
    def getCondProbReverse(self, first, second):
        f = self.index2phonemes[first]
        s = self.index2phonemes[second]
        #a = self.phonemes2probReverse[s][first]
        #print(a)
        return self.phonemes2probReverse[s][first]/self.phonemes2count[s]
    