# Data Pre-processing

In [402]:
import os
import json

## Load lyric files

In [403]:
dataset_path = '../../dataset'

In [404]:
def get_song_files(root_dir: str) -> dict:
    return {int(os.path.splitext(os.path.basename(f))[0]): os.path.join(root_dir, f) for f in os.listdir(root_dir) if f.endswith('.json')}

In [405]:
def load_songs(file_dict: dict):
    song_dict = {}
    for song_id, path in file_dict.items():
        with open(path, 'r') as fp:
            song_dict[song_id] = json.load(fp)
    return song_dict

In [406]:
songid_to_file = get_song_files(os.path.join(dataset_path,'songs'))
songid_to_song = load_songs(songid_to_file)

## Cleanse lyrics

In [407]:
import collections
import itertools
import pickle
import re
from typing import List
import nltk

class LyricsVocab:
    # List of tokens that have reserved corpus index.
    PAD_token = '<pad>'
    SOS_token = '<sos>'
    EOS_token = '<eos>'
    UNK_token = '<unk>'
    reserved_tokens = [PAD_token, SOS_token, EOS_token, UNK_token]

    def __init__(self, songs: dict, min_count: int = None, min_len: int = None):

        # Clean the lyrics of each song and combine into 2D list of sentences and tokens.
        self.corpus = list(itertools.chain.from_iterable(self.clean_lyrics(song['lyrics']) for _,song in songs.items()))

        # Remove sentence duplicates.
        self.corpus = list(map(str.split, set(map(' '.join, self.corpus))))

        # Sort corpus by decreasing length.
        self.corpus.sort(key=len, reverse=True)

        # Remove any lines that are below the threshold.
        if min_len:
            self.corpus = list(itertools.takewhile(lambda sen: len(sen) >= min_len, self.corpus))

        # Generate counts for each word.
        self.counts = collections.Counter(itertools.chain.from_iterable(self.corpus))

        # Remove any counts below the threshold.
        if min_count:
            for key,_ in itertools.dropwhile(lambda tup: tup[1] >= min_count, self.counts.most_common()):
                del self.counts[key]

        # Add EOS token to the end of each sentence.
        for i in range(len(self.corpus)):
            self.corpus[i].append(self.EOS_token)

        # Generate list of unique words.
        self.index2token = self.reserved_tokens + sorted(set(self.counts))

        # Build mappings for: token --> index
        self.token2index = {token: i for i,token in enumerate(self.index2token)}

        # Generate index vectors based on word assignment.
        # Since some words may have been removed, replace all unknown with the UNK token.
        self.vectors = [[self.token2index.get(token, self.token2index[self.UNK_token]) for token in line] for line in self.corpus]

    def save(self, path: str, protocol: int = pickle.DEFAULT_PROTOCOL):
        """Save vocabulary to a pickle file.
        
        The pickle file is a dictionary which contains the following keys:
            index2token: Dictionary of index --> token
            embed: Dictionary of song ID --> 2D list for lyric integer tokens
            counts: Dictionary of token --> number of occurrences
        """
        store = {
            'index2token': self.index2token,
            'token2index': self.token2index,
            'vectors': self.vectors,
            'counts': self.counts,
            'corpus': self.corpus,
        }
        with open(path, 'wb') as fp:
            pickle.dump(store, fp, protocol=protocol)

    @staticmethod
    def decontracted(phrase: str):
        """Remove English word contractions.

        Gleaned from: https://stackoverflow.com/a/47091490
        """
        # specific
        phrase = re.sub(r"won\'t", "will not", phrase)
        phrase = re.sub(r"can\'t", "can not", phrase)
        phrase = re.sub(r"wanna", "want to", phrase)
        phrase = re.sub(r"gotta", "got to", phrase)

        # general
        phrase = re.sub(r"n\'t", " not", phrase)
        phrase = re.sub(r"\'re", " are", phrase)
        phrase = re.sub(r"\'s", " is", phrase)
        phrase = re.sub(r"\'d", " would", phrase)
        phrase = re.sub(r"\'ll", " will", phrase)
        phrase = re.sub(r"\'t", " not", phrase)
        phrase = re.sub(r"\'ve", " have", phrase)
        phrase = re.sub(r"\'m", " am", phrase)
        return phrase

    @classmethod
    def clean_lyrics(cls, lyric: str) -> List[List[str]]:
        lyric = lyric.lower() # Convert to common case.
        lyric = re.sub(r'\[[^\]]*\]', '', lyric) # Remove paranthetical content "[*]", like markers for chorus and verses.
        lyric = re.sub(r'\([^\)]*\)', '', lyric) # Remove paranthetical content "(*)", like markers for chorus and verses.
        lyric = lyric.strip() # Remove any extra newlines at the ends.
        lyric = cls.decontracted(lyric) # Remove contractions before tokenizer to handle special cases.

        # Replace any periods with newlines to ensure sentences end with a newline.
        lyric = re.sub(r"\.", r'\n', lyric)

        # Preserve line structure because tokenizer will remove traditional newlines.
        lyric = re.sub(r"(?:\s*\n\s*)+", r'\n', lyric) # Remove repeated newlines.
        lyric = re.sub('\n', ' NEWLINE ', lyric) # Re-map newlines so that tokenizer doesn't remove them.

        # Tokenize the entire song into list of words.
        tokens = nltk.tokenize.word_tokenize(lyric) # Split into word tokens.
        tokens = [word for word in tokens if word.isalpha()] # Careful to remove punct after contractions.

        # Group sentences together by line.
        tokens_lines = list(filter(None, iter(list(group) for key,group in itertools.groupby(tokens, lambda s: s == 'NEWLINE') if not key)))
        return tokens_lines

In [408]:
# Create vocabulary object.
lv = LyricsVocab(songid_to_song, min_count=3, min_len=3)

In [409]:
print(f"{len(songid_to_song)} songs")
print(f"{len(lv.corpus)} unique lines")
print(f"{len(lv.index2token)} unique words")
print(f"Longest sentence: {len(lv.corpus[0])} words")
print(f"Shortest sentence: {len(lv.corpus[-1])} words")

1339 songs
28735 unique lines
2778 unique words
Longest sentence: 132 words
Shortest sentence: 4 words


In [410]:
print(f"Line:      {lv.corpus[-1]}")
print(f"Embedding: {lv.vectors[-1]}")

Line:      ['from', 'the', 'honeycomb', '<eos>']
Embedding: [947, 2383, 3, 2]


## Write embedding to pickle file

In [411]:
path = os.path.join(dataset_path, 'lyrics.pickle')
lv.save(path)
print(f'Lyrics vocabulary saved: {path}')

Lyrics vocabulary saved: ../../dataset/lyrics.pickle
