In [6]:
import pandas as pd
df = pd.read_csv("df_lyrics.csv")
import nltk
nltk.download('punkt')
import re
from nltk.corpus import stopwords
import nltk.tokenize as nltk
import math
from nltk.corpus import stopwords
import nltk
nltk.download('stopwords')
from tqdm.notebook import tqdm

[nltk_data] Downloading package punkt to /Users/zdrxxxi07/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/zdrxxxi07/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [28]:
# preprocess text function
def preprocessText(words):
    stop_words = set(stopwords.words('english'))
    # remove punctuation
    words = re.sub(r'[^\w\s-]', '', words)
    # remove numbers
    words = re.sub(r'\d+', '', words)
    # tokenize
    words = nltk.word_tokenize(words)
    # remove stop words
    words = [word for word in words if word not in stop_words]
    # split hyphen words
    final = []
    for word in words:
        if "-" in word:
            final.extend(word.split("-"))
        else:
            final.append(word)
    return final

# return a dictionary of dictionaries
# {Song_name: {lyrics_word_1: tf, lyrics_word_2: idf_score .....}}
# this function is the query idf function used to generate based on query_collection
def createQueryIdf(collections):
    collection_len = collections.shape[0]
    each_song_words = list() # in a format of song_name: list of all words in this song lyrics
    IDF_dict = dict()
    for index,song in collections.iterrows():
        song_lyrics = preprocessText(song["Lyrics"])
        each_song_words.append(song_lyrics)
    for index,song in tqdm(collections.iterrows()):
        song_lyrics = preprocessText(song["Lyrics"])
        IDF_dict[song["Song"]] = dict()
        for word in song_lyrics:
            count = 0
            for i in each_song_words:
                if word in i:
                    count+=1
            IDF_dict[song["Song"]][word] = math.log(collection_len/count)
    return IDF_dict

# return a dictionary of dictionaries
# {Song_name: {lyrics_word_1: tf, lyrics_word_2: tf .....}}
# this function is the query tf function used to generate based on query_collection counts
def countQueryTF(collections):
    TF_dict = dict()
    for index,song in tqdm(collections.iterrows()):
        TF_dict[song["Song"]] = dict()
        song_lyrics = preprocessText(song["Lyrics"])
        for word in set(song_lyrics):
            count = 0
            for i in song_lyrics:
                if word == i:
                    count +=1
            TF_dict[song["Song"]][word] = count
    return TF_dict

# this is only for query vectors creation
# takes in the return value of createIdf function and countTF function
# return vectors for the whole collection based on idf_dict entry length
def createVector(idf_dict, TF_dict):
    return_vector_dict = dict()
    for song_name, idfs in tqdm(idf_dict.items()):
        return_vector_dict[song_name] = dict()
        for word, idf in idf_dict[song_name].items():
            return_vector_dict[song_name][word] = idf*TF_dict[song_name][word]
    return return_vector_dict

# take the orginal set of data and split into different dictionaries based on genre groups
# the result is a dictionary with each song lyrics in a list
# e.g. Pop: ["Lyrics of song1", "Lyrics of song2" ...]
def groupLyrics(old_abstracts):
    genre_type = set(old_abstracts["Genre"])
    grouped = old_abstracts.groupby(['Genre'])
    genre_lyrics = dict()
    for genre in genre_type:
        genre_lyrics_list = list()
        df = grouped.get_group(genre)
        for index, song in df.iterrows():
            genre_lyrics_list.append(song["Lyrics"])
        genre_lyrics[genre] = genre_lyrics_list
    return genre_lyrics

# return a dictionary in the format of {index : ["{Song}_{index}", "Lyrics", "Genre"]}
# note that later I convert this dictionary into a dataframe
# the Song name then become an arbitrary name I formatted as "{Genre}_{index}", so if the song originally is a Rock song
# the new name of the song might be "Rock_1"
def createCombinedLyrics(grouped_dict, split_num):
    ret_dict = dict()
    global_count = 0
    count = 0
    new_lyrics = ""
    for genre_name, lyrics in grouped_dict.items():
        for song_lyrics in lyrics: 
            if count == split_num:
                global_count += 1
                ret_dict[global_count] = [genre_name+"_"+str(global_count),new_lyrics, genre_name]
                count = 0
                new_lyrics = ""
            new_lyrics += " "
            new_lyrics += song_lyrics
            count+=1
    return ret_dict

# cosine similarity function
# takes in two vectors, pumps out their cosine similarity
def cosineSim(vec1,vec2):
    dotproduct = 0
    for i in range(0, len(vec1)):
        dotproduct += vec1[i] * vec2[i]
    mag1 = (sum(val**2 for val in vec1))
    mag2 = (sum(val**2 for val in vec2))
    if mag1 == 0 or mag2 == 0:
        return 0
    return dotproduct / math.sqrt(mag1*mag2)

In [32]:
#this block is only for create query versus abstracts context not for query to query

#return: {abstract_name: {word: word_count_in_abstract}}
# this function counts the abstract word count so that you can directly know in 
# which abstract how many certain words there are
# you can search use: return_dict["abstract_name"]["word"], this gives you the word count in this abstract
def countAbstractWord(abs_collections):
    abs_dict = dict()
    for index,song in abs_collections.iterrows():
        abs_lyrics = preprocessText(song["Lyrics"])
        abs_dict[song["Song"]] = dict()
        for word in abs_lyrics:
            try:
                abs_dict[song["Song"]][word]+=1
            except:
                abs_dict[song["Song"]][word]=1
    return abs_dict

# return a dictionary
# {Query_song_name: [corresbonding idf with the abstract collection]}
def createAbsIdf(qry_collections, abs_dict):
    collection_len = len(abs_dict)
    each_song_words = dict() # in a format of song_name: list of all words in this song lyrics
    IDF_dict = dict()
    for index,song in qry_collections.iterrows():
        song_lyrics = preprocessText(song["Lyrics"])
        each_song_words[song["Song"]] = song_lyrics
    for query_name,query_lyrics in tqdm(each_song_words.items()):
        IDF_dict[query_name] = list()
        for word in query_lyrics:
            count = 1
            for abs_song, word_count in abs_dict.items():
                try:
                    temp = word_count[word]
                    count+=1
                except:
                    continue
            IDF_dict[query_name].append(math.log(collection_len/count))
    return IDF_dict

# take in the query_song and the abstract collection
# return a dictionry of dictionaries
# {query_song: {abstract_song_name: [list of tf values]}}
def createAbsTF(qry_collections, abs_dict):
    TF_dict = dict()
    each_song_words = dict() # in a format of song_name: list of all words in this song lyrics
    for index,song in qry_collections.iterrows():
        song_lyrics = preprocessText(song["Lyrics"])
        each_song_words[song["Song"]] = song_lyrics
    for song_name,song_lyrics in tqdm(each_song_words.items()):
        TF_dict[song_name] = dict()
        for abstract_name,word_count in abs_dict.items():
            temp_list = list()
            for word in song_lyrics:
                try:
                    num = word_count[word]
                except:
                    num = 0
                temp_list.append(num)
            TF_dict[song_name][abstract_name] = temp_list
    return TF_dict

# take in the above return value and create vector in a format of
# {query_song: {abstract_name: [TFIDF vector]}}
def createAbstractVector(idf_dict, TF_dict):
    return_vector_dict = dict()
    for song_name, info in tqdm(TF_dict.items()):
        return_vector_dict[song_name] = dict()
        for abstract, tf in TF_dict[song_name].items():
            return_vector_dict[song_name][abstract] = list()
            for i in range(0, len(tf)):
                return_vector_dict[song_name][abstract].append(tf[i]*idf_dict[song_name][i])
    return return_vector_dict

In [9]:
import math

# GOAL:
# need to read the data file and create TFIDF vectors for queries and abstracts

# format of data
# ARTIST, SONG, LYRICS, GENRE

# devote 20% of the data for testing
# the other 80% can be for development/training
df = pd.read_csv("df_lyrics.csv")

# need to shuffle the data
df = df.sample(frac=1, random_state=42)
# Reset the index after shuffling
df.reset_index(drop=True, inplace=True)

# then devote 20% to testing
# Calculate the number of rows for the test set (20% of the original DataFrame)
test_size = int(0.2 * len(df))

# Take a random sample of 20% of the rows for the test set
test_df = df.sample(n=test_size, random_state=42)

# Create the training set (80% of the original DataFrame)
train_df = df.drop(test_df.index)

# Reset the index of the training and test sets
train_df.reset_index(drop=True, inplace=True)
test_df.reset_index(drop=True, inplace=True)

print(train_df.shape[0])
# 13 percent of total are the query songs

# we need to separate queries and abstracts
# 9635 is approximately 15% of the length train_df
query_songs = train_df.head(300)

# Remove the first 9635 rows from the original DataFrame
train_df = train_df.iloc[300:]

# Reset the index of the dataframes
query_songs.reset_index(drop=True, inplace=True)
train_df.reset_index(drop=True, inplace=True)

# debug some counts for each genre
print(train_df['Genre'].value_counts())



#group combined lyrics based on genre within abstracts
new_train_df = createCombinedLyrics(groupLyrics(train_df),2)
new_train_df = pd.DataFrame.from_dict(new_train_df, orient='index', columns=['Song', 'Lyrics', 'Genre'])

# IDF and TF score
query_idf = createQueryIdf(query_songs)
query_TF = countQueryTF(query_songs)
new_train_df_word_count = countAbstractWord(new_train_df)
train_df_idf = createAbsIdf(query_songs, new_train_df_word_count)
train_df_TF = createAbsTF(query_songs, new_train_df_word_count)

64228
Rock       17587
Country    17259
Rap        14580
Pop        14502
Name: Genre, dtype: int64


0it [00:00, ?it/s]

0it [00:00, ?it/s]

  0%|          | 0/300 [00:00<?, ?it/s]

  0%|          | 0/300 [00:00<?, ?it/s]

In [10]:
# create vectors
queryVectors = createVector(query_idf,query_TF)

  0%|          | 0/300 [00:00<?, ?it/s]

In [11]:
abstractVectors = createAbstractVector(train_df_idf,train_df_TF)

  0%|          | 0/300 [00:00<?, ?it/s]

In [14]:
similarityScores = dict()#to hold similarity scores to compare later

for song, vector in queryVectors.items():
    similarityScores[song] = dict()
    for abstract, vector in abstractVectors[song].items():
        cosineScore = cosineSim(list(queryVectors[song].values()),abstractVectors[song][abstract])
        #get cosine sim of song and each abstract
        similarityScores[song][abstract]= cosineScore # record the cosine similarity

for k in similarityScores:
    similarityScores[k] = sorted(similarityScores[k].items(), key=lambda x: x[1], reverse=True)
    

In [34]:
similarityScores["Avalon"]

'Country'

In [15]:
# take the average of scores over each genre and then take the largest value as the predicted result
# now dictionary similarityScores looks like:
# {Song_name: [(Abstract_name_1, similarity score), (Abstract_name_1, similarity score), (Abstract_name_1, similarity score)]}
for song in similarityScores:
    song_dict = dict()
    for genre_score in similarityScores[song]:
        # get the abstract genre type cuz I formatted in "Genre_Index"
        genre = genre_score[0].split("_")[0]
        # the similarity score is always the second entry in this tuple
        score = genre_score[1]
        # in case the genre has never been seen, first try to add one more score into list song_dict[genre]
        # or just create the list song_dict[genre] and add in the socre
        try:
            song_dict[genre].append(score)
        except:
            song_dict[genre] = [score]
    # song_dict looks like {Genre: [list of all similarity scores]}/ this is for each song cuz in the loop
    # tracks of the current high score in genres
    high_score = 0
    result = str()
    for genre in song_dict:
        # calculate the average similarity score within each genre
        cur_score = sum(song_dict[genre])/len(song_dict[genre])
        # if the score if higher than the current high score
        # replace and record the high_score genre name
        if cur_score>high_score:
            result = genre
    similarityScores[song] = result

predict_result = similarityScores

predict_result

{'Playa Hater': 'Rap',
 'Dazed and Confused (How the West Was Won)': 'Rock',
 'The Glory of Love': 'Rap',
 'Apples': 'Rock',
 'Here Come the Warm Jets': 'Pop',
 'Christmas Must Be Tonight': 'Pop',
 'Im Tired': 'Pop',
 'Heaven, Hell or Houston': 'Country',
 'Would You': 'Pop',
 'Church In The Wildwood': 'Rock',
 'I Still Luv You': 'Rock',
 'In Another Land': 'Pop',
 'Rich & Sad': 'Rock',
 'Water': 'Country',
 'Black Pearl': 'Country',
 'Sixty Years On': 'Pop',
 'Echo': 'Country',
 'Frisky': 'Pop',
 'DarkSide': 'Pop',
 'Chemo Limo': 'Pop',
 'Evolve': 'Country',
 'Kick Drum': 'Country',
 'Meatplow': 'Country',
 'Benjamin Twine': 'Pop',
 'Farewell Party': 'Rock',
 'Rule The World': 'Pop',
 'A.K.A.': 'Rap',
 'CHURCH GIRL': 'Pop',
 'Let U Down': 'Rap',
 'Make You Feel That Way': 'Rock',
 'We Want Weezy (intro)': 'Rock',
 'Excursions': 'Pop',
 'Summers End': 'Pop',
 'Lost Cause': 'Country',
 'Bout That Life': 'Country',
 'Ripples': 'Rap',
 'Blue Slide Park': 'Country',
 'Ripchord': 'Country',

In [16]:
# use the original query song as answer
answer = dict()
for i in query_songs.index:
    cur_song = query_songs.iloc[i]
    answer[cur_song["Song"]] = cur_song["Genre"]

In [17]:
#calculate accuracy
total_song_num = len(answer)
accuracy = sum([1 for k in answer if answer[k] == predict_result[k]])/total_song_num

In [18]:
accuracy

0.22333333333333333