In [None]:
# macOS
# %pip3 install lyricsgenius

# others
# pip install lyricsgenius


In [None]:
# import files
import pandas as pd
import os
import lyricsgenius as lg
import re

In [None]:
"""
import spotify csv files with customised settings
"""
def import_csv(file_path: str):
  return pd.read_csv(file_path, header=0)

def remove_extra_whitespace(text: str):
  return " ".join(text.split())
  return " ".join(text.split())

# check if CleanData folder exist
import_folder_path = os.path.join("..", "datasets", "RawData")
if not os.path.exists(import_folder_path):
  raise Exception("Dataset RawData not found! Download Datasets from Spotify") 

# export folder path
export_folder_path = os.path.join("..", "datasets", "CompileData")
if not os.path.exists(export_folder_path):
  os.makedirs(export_folder_path)

In [None]:
"""
combine all songs
- calculate sum of streams
"""
def combine_all_songs(df: pd.DataFrame):
  all_songs = df.to_dict('records')
  top_songs = {}
  top_songs_list = []

  for song in all_songs:
    url: str = song['uri']
    # get the url unique id, potentially songs share`` the same name but unlikely
    # spotify:track:XXXXXX
    # we obtain the XXXXXX
    uid = url.split(':')[-1]
    song_detail = top_songs.get(uid)

    # if None, means the song has yet to be added
    if (song_detail != None):
      # sum streams up together
      streams = song_detail['streams'] + song['streams']
      song_detail['streams'] = streams
      top_songs[uid] = song_detail

    else: 
      track_name = song['track_name']
      artist = song['artist_names']
      streams = song['streams']
      top_songs[uid] = {
        "track_name": track_name,
        "artist": artist,
        "streams": streams,
        "uid": uid,
      }
        

  # convert dict to array
  top_songs_list = list(top_songs.values())
  # sort array
  sorted_list = sorted(top_songs_list, key=lambda d: d['streams'], reverse=True)
  
  return sorted_list


"""
iterate and combine errors of song to one element
"""
def merge_song_errors(song_error_list: list):
  error_song_dict = {}

  for song_error in song_error_list:
    uid: str = song_error['uid']
    year: int = song_error['year']
    song_in_list = error_song_dict.get(uid)

    if (song_in_list != None):
      years = '{0}, {1}'.format(song_in_list['year'], year)
      song_in_list['year'] = years
      error_song_dict['uid'] = song_in_list
    else:
      error_song_dict[uid] = {
        'track_name': song_error['track_name'],
        'uid': uid,
        'artist': song_error['artist'],
        'year': str(year)
      }
  
  return list(error_song_dict.values())


In [None]:
"""
iterate and get songs from dict
"""
def find_song_lyrics(song_list: list):
  new_song_list = []

  for song in song_list:
    track_name = song.get('track_name', '')
    artist = song.get('artist', '')
    lyrics = ""

    try: 
      lyrics = "\"" + get_lyrics(track_name, artist) + "\""

    except Exception as error:
      # print error and log it
      print('[' + track_name + ']:', error)
      list_of_songs_with_error.append({
        'track_name': song.get('track_name', ''),
        'uid': song.get('uid', ''),
        'artist': song.get('artist', ''),
        'year': start_year
      })

    song['lyrics'] = lyrics
    new_song_list.append(song)

  return new_song_list   

In [None]:
# declare constant Genius
token = ""
genius = lg.Genius(
  token, 
  skip_non_songs=True, 
  excluded_terms=["(Live)", "(Original Motion Picture Soundtrack)"], 
  remove_section_headers=True,
  timeout=10,
  sleep_time=1,
  retries=3
)

# search for pattern with (feat) | (from) | (with) including of spaces between them
# and includes texts after the paranthesis
remove_paranthesis_pattern = r'\((?!Remix)\s*[^()]*\).*|\[(?!Remix)\s*[^()]*\].*'
remove_embed_number_pattern = r'[\d]+[Embed]+'
remove_hypens_pattern = r'\-[^()]*'
remove_punctuations_pattern = r'[^\w\s]'

"""
get lyrics api
return string: lyrics
"""
def get_lyrics(song_title: str, song_artist: str):
  START_TYPE = -1
  MAX_TYPE = 3
  lyrics = ""
  trimmed_song_title_remove_feat = re.sub(remove_paranthesis_pattern, "" , song_title).strip()
  trimmed_song_title = trimmed_song_title_remove_feat.split('- From')[0].strip()
  first_artist = song_artist.split(',')[0].strip()

  if len(song_title) > 0 and len(song_artist) > 0:
    START_TYPE = 0
  elif len(song_title) > 0:
    START_TYPE = 3

  while (START_TYPE < MAX_TYPE):
    if (START_TYPE == 0):
      song = genius.search_song(title=trimmed_song_title, artist=first_artist)
    elif (START_TYPE == 1):
      song = genius.search_song(title=trimmed_song_title)
    elif (START_TYPE == 2):
      trimmed_song_title = re.sub(remove_hypens_pattern, "" , trimmed_song_title).strip()
      song = genius.search_song(title=trimmed_song_title)
    elif (START_TYPE == -1):
      raise Exception("Not enough information for", trimmed_song_title, first_artist)

    START_TYPE = START_TYPE + 1
    if song == None:
      if START_TYPE < MAX_TYPE:
        continue
      if START_TYPE == MAX_TYPE:
        raise Exception(trimmed_song_title + " not found in Genius!")

    # clean lyrics
    try:
      lyrics = clean_lyrics(lyrics=song.lyrics, song_title=trimmed_song_title)
      if len(lyrics) > 0:
        START_TYPE = 6
    except Exception as error:
      raise error
  
  return lyrics


"""
remove song lyrics header XXXX lyric
and remove song lyrics embed [Number]Embed (e.g. 200Embed)
"""
def clean_lyrics(lyrics: str, song_title: str):
  lyrics_clean = ""
  lyrics_split = lyrics.split('\n')
  trimmed_song_title_compare = remove_extra_whitespace(
    re.sub( remove_punctuations_pattern, "", (song_title + " Lyrics").lower())
  )
  lyrics_song_title_compare = remove_extra_whitespace(
    re.sub(remove_punctuations_pattern, "", lyrics_split[0].lower())
  )
  
  # if lyrics is not lyrics
  if (trimmed_song_title_compare not in lyrics_song_title_compare):
    raise Exception("No lyrics found with Genius!")

  lyrics_remove_title_list = lyrics_split[1:]
  lyrics_clean_last_line = re.sub(remove_embed_number_pattern, "", lyrics_split[-1]).strip()
  for index in range(len(lyrics_remove_title_list)):
    # if last position, replace with out cleaned line
    if index == len(lyrics_remove_title_list) - 1:
      lyrics_clean = lyrics_clean + lyrics_clean_last_line
    else:
      lyrics_clean = lyrics_clean + lyrics_remove_title_list[index] + '\n'
  
  return lyrics_clean

In [None]:
start_year = 2017
end_year = 2021
list_of_songs_with_error = []

while start_year <= end_year:
  print("Starting with", start_year)

  # get path of "Spotify XXXX"
  path = os.path.join(import_folder_path, "Spotify " + str(start_year))

  # find csv files in the folder
  csv_file_path = []
  for f in os.listdir(path):
    if f.endswith('.csv'):
      csv_file_path.append(os.path.join(path, f))

  # lambda function:
  # import csv from the list of csv path above to create a single dataframe
  df: pd.DataFrame = pd.concat(map(import_csv, csv_file_path))
  combined_songs_list = combine_all_songs(df)
  combined_songs_lyrics_list = find_song_lyrics(combined_songs_list)
  df_song_lyrics = pd.DataFrame.from_dict(combined_songs_lyrics_list)
  
  # export to excel
  print("Done fetching songs of ", start_year)
  export_path = os.path.join(export_folder_path, str(start_year) + "_spotify_data_and_lyrics.csv")
  df_song_lyrics.to_csv(export_path, encoding="utf-8-sig")

  print(start_year, "written to file")
  start_year = start_year + 1
  
# export logged errors to songs_errors.csv to be manually source
if len(list_of_songs_with_error) > 0:
  song_errors_list = merge_song_errors(list_of_songs_with_error)   
  print(len(song_errors_list), "song errors found!, exporting to csv")

  df_error = pd.DataFrame.from_dict(song_errors_list)
  error_path = os.path.join(export_folder_path, "songs_errors.csv")
  df_error.to_csv(error_path, encoding="utf-8-sig")

print("End of Program")