<a href="https://colab.research.google.com/github/AntJuLRa/ANLP-project-/blob/dev/generation/word_level_LSTM_generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import pickle
import numpy as np
import torch
import nltk
import random
import torch.nn as nn
import torch.nn.functional as fnc
torch.manual_seed(42)

<torch._C.Generator at 0x7fc736869b58>

In [3]:
OHHLA_data = pickle.load(open( "OHHLAdata.p", "rb" ))

In [4]:
flat_data = [word for song in OHHLA_data for word in song]

In [5]:
freq_dict= nltk.FreqDist(flat_data)

In [6]:
valid_vocab= [word for word,freq in freq_dict.items() if freq>30]
invalid_vocab= [word for word,freq in freq_dict.items() if freq<=30]

In [7]:
len(valid_vocab)

10844

In [26]:
index_dict = dict()
for i, x in enumerate(valid_vocab):
  index_dict[x]=i+1

rev_index_dict = dict([reversed(i) for i in index_dict.items()])

def phrase_to_tensor(phrase, ind_dict):
  word_list = [word for line in phrase for word in line]
  tensor = torch.zeros(len(word_list)).long()
  for i, x in enumerate(word_list):
    try:
      tensor[i]=ind_dict[x]
    except KeyError:
      tensor[i]=0
  return tensor

In [27]:
from itertools import groupby
#We train on sequences of four lines
verses = [list(x) for y in OHHLA_data for k, x in groupby(y, lambda x: x =='nxtvrse') if not k]

In [28]:
def get_phrases(verse,num_lines=4,endline='endline'):
  lines = [list(x)+[endline] for k, x in groupby(verse, lambda x: x ==endline) if not k]
  res=[]
  for i,x in enumerate (lines[:len(lines)-num_lines+1]):
    res.append(lines[i:i+num_lines])
  return res

In [29]:
list_of_phrases=[get_phrases(verse) for verse in verses]
phrases=[x for y in list_of_phrases for x in y]

In [30]:
invalid_phrases= []
for i,phrase in enumerate(phrases):
  for word in [word for line in phrase for word in line]:
    if freq_dict[word]<=30:
      invalid_phrases.append(i)
      break

In [31]:
valid_phrases = list(set(list(range(0,len(phrases)))) - set(invalid_phrases))

In [32]:
def random_valid_phrase():
  i= random.choice(valid_phrases)
  return phrases[i]

In [35]:
def random_training_set():    
    x=random_valid_phrase()
    phrase = phrase_to_tensor(x,index_dict)
    input = phrase[:-1]
    target = phrase[1:]
    return input, target

**LSTM for text generation**

In [81]:
class LSTM(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_dim, num_layers, embedding_dim=100):
        super(LSTM, self).__init__()
        self.i_dim= input_dim
        self.e_dim= embedding_dim
        self.h_dim= hidden_dim
        self.o_dim= output_dim
        self.n_layers= num_layers
        
        self.embedding = nn.Embedding(self.i_dim, self.e_dim)
        self.lstm = nn.LSTM(input_size=self.e_dim, hidden_size=self.h_dim, num_layers=self.n_layers)
        self.out = nn.Linear(self.h_dim,self.o_dim)
        
    
    def forward(self, inp, hidden_cell):
        embedded = self.embedding(inp)
        lstm_out, hidden = self.lstm(embedded.view(1,1,-1), hidden_cell)
        res = self.out(lstm_out.view(1, -1))
        res = fnc.log_softmax(res, dim=1)
        
        return res, hidden 
        

    def init_hidden(self):
        hidden=torch.zeros(self.n_layers,1,self.h_dim)
        cell = torch.zeros(self.n_layers,1,self.h_dim)
        return hidden, cell

In [82]:
def generate(LSTM_model, start='i', max_len=150, num_lines=4, temp=0.8):
    hidden, cell = LSTM_model.init_hidden()
    prime_input = phrase_to_tensor([[start]], index_dict)
    predicted = [start]

    for p in range(len(prime_input)):
        _, (hidden, cell) = LSTM_model(prime_input[p], (hidden, cell)) 
    input = prime_input[-1]
    
    line_count=0
    for p in range(max_len):
        if line_count>=num_lines:
          break

        output, (hidden, cell) = LSTM_model(input, (hidden, cell))
        
        output_dist = output.data.view(-1).div(temp).exp()
        i = int(torch.multinomial(output_dist, 1)[0]) 
        predicted_next = rev_index_dict[i]

        if predicted_next=='endline':
          line_count= line_count+1
        
        predicted.append(predicted_next)
        input = phrase_to_tensor([[predicted_next]],index_dict)

    return predicted

In [83]:
import time, math

def time_since(since):
    s = time.time() - since
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)

In [84]:
def training_step(LSTM_model, optimizer, input, target, sample_len, criterion):
    hidden, cell = LSTM_model.init_hidden()
    LSTM_model.zero_grad()
    loss = 0
    for c in range(sample_len):
        output, (hidden, cell) = LSTM_model(input[c], (hidden, cell))
        loss = loss + criterion(output, target[c].view(1))

    loss.backward()
    optimizer.step()

    return loss.item() /sample_len

In [85]:
n_epochs = 1000
print_every = 100
plot_every = 10
hidden_size = 128
n_layers = 2
n_items= len(index_dict)

lr = 0.005
LSTM_model = LSTM(n_items, n_items, hidden_size, n_layers, embedding_dim=100)
optimizer = torch.optim.Adam(LSTM_model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

start = time.time()
all_losses = []
loss_avg = 0

for epoch in range(1, n_epochs+1):
    phrase = random_training_set()
    loss = training_step(LSTM_model, optimizer, phrase[0], phrase[1], len(phrase[1]), criterion)
    loss_avg += loss

    if epoch % print_every == 0:
        print('[{} ({} {}%) {:.4f}]'.format(time_since(start), epoch, epoch/n_epochs * 100, loss))
        print(generate(LSTM_model), '\n')

    if epoch % plot_every == 0:
        all_losses.append(loss_avg/ plot_every)
        loss_avg = 0

[0m 31s (100 10.0%) 5.8028]
['i', 'left', 'endline', 'it', 'endline', 'the', 'that', 'kick', 'for', 'but', 'writing', 'endline', "'", "'s", 'sounds', 'your', 'he', 'woman', 'a', 'the', 'shit', 'endline'] 

[1m 1s (200 20.0%) 6.0860]
['i', 'anybody', 'will', 'and', 'gets', "'", 'you', 'endline', 'you', 'your', 'for', 'a', 'off', 'endline', 'on', 'attention', 'with', 'and', 'of', 'kill', 'roll', 'endline', 'they', 'her', 'to', 'endline'] 

[1m 32s (300 30.0%) 6.5253]
['i', "'s", 'he', 'with', 'just', 'day', 'your', 'endline', 'ai', 'nigga', 'endline', 'shit', 'me', 'endline', 'if', 'endline'] 

[2m 4s (400 40.0%) 6.6513]
['i', 'you', 'endline', 'there', 'mind', 'the', 'endline', 'nigga', 'still', 'the', 'these', 'endline', "'s", 'think', 'your', 'whole', 'endline'] 

[2m 37s (500 50.0%) 5.8209]
['i', 'how', 'her', 'said', 'endline', 'a', 'seas', 'endline', 'for', 'endline', 'i', "'m", 'the', 'ground', 'shit', 'endline'] 

[3m 7s (600 60.0%) 6.4451]
['i', 'to', "'ll", 'endline', 'to', 'ba

In [None]:
LSTM_model = LSTM(n_items, n_items, hidden_size, n_layers, embedding_dim=100)
optimizer = torch.optim.Adam(LSTM_model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

start = time.time()
all_losses = []
loss_avg = 0

for epoch in range(1, n_epochs+1):
    phrase = random_training_set()
    loss = training_step(LSTM_model, optimizer, phrase[0], phrase[1], len(phrase[1]), criterion)
    loss_avg += loss

    if epoch % print_every == 0:
        print('[{} ({} {}%) {:.4f}]'.format(time_since(start), epoch, epoch/n_epochs * 100, loss))
        print(generate(LSTM_model), '\n')

    if epoch % plot_every == 0:
        all_losses.append(loss_avg/ plot_every)
        loss_avg = 0