In [None]:
import numpy as np
import pandas as pd
import string
import nltk
import re
from sklearn.model_selection import train_test_split
import tensorflow
import keras
from keras.models import Sequential
from keras.layers import Embedding, Bidirectional, LSTM, Dense,  Activation
from keras.callbacks import ModelCheckpoint, LambdaCallback, EarlyStopping


# load lyrics data from MusicOSet 
# should show a dataframe with 20000 song_ids and their lyrics
df = pd.read_csv("/Users/jlsan/Documents/GitHub/cs4120-final/musicoset_songfeatures/lyrics.csv", sep="\t")
df.info()
df.head()

# adding poems from The Poetry Foundation (14000 poems, author and tags assoiated with poem)
# we combined both sources to increase data quali5y
pdf = pd.read_csv('/Users/jlsan/Documents/GitHub/cs4120-final/musicoset_songfeatures/PoetryFoundationData.csv',quotechar='"')
pdf.head()
df = df.dropna()

# initializing string stranslator to clean punctutation before training
translator = str.maketrans('', '', string.punctuation)

# splits lyrics into intro, verses, and chorus, only selects first 4 verses + chorus
def split_text(x):
   text = x['lyrics']
   sections = text.split('\\n\\n')
   keys = {'Verse 1': np.nan,'Verse 2':np.nan,'Verse 3':np.nan,'Verse 4':np.nan, 'Chorus':np.nan}
   lyrics = str()
   single_text = []
   res = {}
   for s in sections:
       key = s[s.find('[') + 1:s.find(']')].strip()
       if ':' in key:
           key = key[:key.find(':')]
          
       if key in keys:
           single_text += [x.lower().replace('(','').replace(')','').translate(translator) for x in s[s.find(']')+1:].split('\\n') if len(x) > 1]
       res['single_text'] =  ' \n '.join(single_text)
   return pd.Series(res)
# joins resulting text into a single text
df = df.join(df.apply(split_text, axis=1))
df.head()

print(df)

# # Testing
# lines = ''
# for i in df.head(1)['lyrics']:
#     lines = i.split('\\n\\n')
# print (lines)
# df['lyrics'].iloc[0]

In [None]:
# cleaning poems text
pdf['single_text'] = pdf['Poem'].apply(lambda x: ' \n '.join([l.lower().strip().translate(translator) for l in x.splitlines() if len(l)>0]))
pdf.head()

# combine poems dataframe and lyrics dataframe
sum_df = pd.DataFrame( df['single_text'] )
sum_df = pd.concat([df, pd.DataFrame( pdf['single_text'])])
sum_df.dropna(inplace=True)

In [None]:
text_as_list = []  # List to store all words extracted from the text
frequencies = {}   # Dictionary for word frequencies
uncommon_words = set()  # Set for uncommon words
MIN_FREQUENCY = 7  # Minimum frequency threshold for words
MIN_SEQ = 5        # Minimum sequence length
BATCH_SIZE = 32    # Batch size for data generator

def extract_text(text):
    global text_as_list
    text_as_list += [word for word in text.split(' ') if word.strip() != '' or word == '\n']

df['single_text'].apply(extract_text)
print('Total words:', len(text_as_list))

for word in text_as_list:
    frequencies[word] = frequencies.get(word, 0) + 1

uncommon_words = {key for key in frequencies.keys() if frequencies[key] < MIN_FREQUENCY}
words = sorted({key for key in frequencies.keys() if frequencies[key] >= MIN_FREQUENCY})
num_words = len(words)
word_indices = {word: i for i, word in enumerate(words)}
indices_word = {i: word for i, word in enumerate(words)}
print('Words with less than {} appearances: {}'.format(MIN_FREQUENCY, len(uncommon_words)))
print('Words with more than {} appearances: {}'.format(MIN_FREQUENCY, len(words)))

valid_seqs = []   
end_seq_words = []  

for i in range(len(text_as_list) - MIN_SEQ):
    end_slice = i + MIN_SEQ + 1
    if len(set(text_as_list[i:end_slice]).intersection(uncommon_words)) == 0:
        valid_seqs.append(text_as_list[i: i + MIN_SEQ])
        end_seq_words.append(text_as_list[i + MIN_SEQ])

print('Valid sequences of size {}: {}'.format(MIN_SEQ, len(valid_seqs)))

X_train, X_test, y_train, y_test = train_test_split(valid_seqs, end_seq_words, test_size=0.02, random_state=42)


In [None]:
# Data generator for fit and evaluate
def generator(sentence_list, next_word_list, batch_size):
    index = 0
    while True:
        x = np.zeros((batch_size, MIN_SEQ), dtype=np.int32)
        y = np.zeros((batch_size), dtype=np.int32)
        for i in range(batch_size):
            for t, w in enumerate(sentence_list[index % len(sentence_list)]):
                x[i, t] = word_indices[w]
            y[i] = word_indices[next_word_list[index % len(sentence_list)]]
            index = index + 1
        yield x, y

# Function to sample the next word
def sample(preds, temperature=1.0):
    # Helper function to sample an index from a probability array
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

# Function to generate text at the end of each epoch
def on_epoch_end(epoch, logs):
    # Function invoked at the end of each epoch. Prints generated text.
    examples_file.write('\n----- Generating text after Epoch: %d\n' % epoch)
    
    # Randomly pick a seed sequence
    seed_index = np.random.randint(len(X_train + X_test))
    seed = (X_train + X_test)[seed_index]
 
    for diversity in [0.3, 0.4, 0.5, 0.6, 0.7]:
        sentence = seed
        examples_file.write('----- Diversity:' + str(diversity) + '\n')
        examples_file.write('----- Generating with seed:\n"' + ' '.join(sentence) + '"\n')
        examples_file.write(' '.join(sentence))
        for i in range(50):
            x_pred = np.zeros((1, MIN_SEQ))
            for t, word in enumerate(sentence):
                x_pred[0, t] = word_indices[word]
            preds = model.predict(x_pred, verbose=0)[0]
            next_index = sample(preds, diversity)
            next_word = indices_word[next_index]
 
            sentence = sentence[1:]
            sentence.append(next_word)
 
            examples_file.write(" " + next_word)
        examples_file.write('\n')
    examples_file.write('=' * 80 + '\n')
    examples_file.flush()

In [None]:
# defines and builds the model architechture
def get_model():
   print('Building model...')
   model = Sequential()
   model.add(Embedding(input_dim=num_words, output_dim=1024))
   model.add(Bidirectional(LSTM(128)))
   model.add(Dense(num_words))
   model.add(Activation('softmax'))
   return model


In [None]:
# Define the model architecture
model = get_model()

# Compile the model
model.compile(loss='sparse_categorical_crossentropy', optimizer="adam", metrics=['accuracy'])

# Define the file path for saving model checkpoints
file_path = "./checkpoints/LSTM_LYRICS-epoch{epoch:03d}-words%d-sequence%d-minfreq%d-" \
           "loss{loss:.4f}-acc{accuracy:.4f}-val_loss{val_loss:.4f}-val_acc{val_accuracy:.4f}.keras" % \
           (num_words, MIN_SEQ, MIN_FREQUENCY)

# Configure model checkpoint to save the best model
checkpoint = ModelCheckpoint(file_path, monitor='val_accuracy', save_best_only=True)

# Define a callback to print generated text at the end of each epoch
print_callback = LambdaCallback(on_epoch_end=on_epoch_end)

# Early stopping to prevent overfitting
early_stopping = EarlyStopping(monitor='val_accuracy', patience=15)

# List of callbacks
callbacks_list = [checkpoint, print_callback, early_stopping]

# Open a file to store generated examples
examples_file = open('examples.txt', "w")

# Train the model
model.fit(generator(X_train, y_train, BATCH_SIZE),
                   steps_per_epoch=int(len(valid_seqs)/BATCH_SIZE) + 1,
                   epochs=10,
                   callbacks=callbacks_list,
                   validation_data=generator(X_test, y_train, BATCH_SIZE),
                   validation_steps=int(len(y_train)/BATCH_SIZE) + 1)
