In [None]:
# import relevant libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
from notebook.services.config import ConfigManager
import logging
import requests
import base64
import concurrent.futures
from ratelimit import limits, sleep_and_retry

In [None]:
# reload the data
streams_df = pd.read_csv(r'/Users/adityamxr/Desktop/spotify-time-series/data-fetching/streams_df_1.csv')

In [None]:
# increase the IOPub data rate limit to prevent the notebook from stopping output 
# when processing large volumes of data or handling frequent API rate limiting messages

cm = ConfigManager().update('notebook', {'NotebookApp': {'iopub_data_rate_limit': 100000000}})

In [None]:
# set up logging
logging.basicConfig(level=logging.INFO)

# spotify API credentials
client_id = '56b1d34cf1574e28855dc07f73f7754c'
client_secret = '490ebd868e76461fbd7dcb7a0a9cce8f'

# cache to store artist-genre mappings
genre_cache = {}

# function to get the access token
def get_access_token(client_id, client_secret):
    token_url = "https://accounts.spotify.com/api/token"
    credentials = f"{client_id}:{client_secret}"
    encoded_credentials = base64.b64encode(credentials.encode()).decode()

    headers = {
        "Authorization": f"Basic {encoded_credentials}"
    }
    data = {
        "grant_type": "client_credentials"
    }

    response = requests.post(token_url, headers=headers, data=data)
    response.raise_for_status()
    access_token = response.json().get('access_token')
    return access_token

In [None]:
# get the access token using the provided client ID and client secret
access_token = get_access_token(client_id, client_secret)

# define a rate-limited function to fetch genres for a given artist from the spotify api
# the function is rate-limited to 10 calls per second using the 'limits' decorator
@sleep_and_retry  # retry the request if rate limits are hit
@limits(calls=10, period=1)  # limit to 10 API calls per second
def get_artist_genres(artist_name, access_token):
    # spotify api endpoint to search for an artist
    search_url = "https://api.spotify.com/v1/search"
    
    # set up the authorization header with the Bearer token
    headers = {
        "Authorization": f"Bearer {access_token}"
    }
    
    # set up the query parameters to search for the artist by name
    params = {
        "q": artist_name,  # artist name to search for
        "type": "artist",  # specify that we are searching for an artist
        "limit": 1  # limit the search to 1 artist (the most relevant one)
    }

    try:
        # make the GET request to the Spotify API to search for the artist
        response = requests.get(search_url, headers=headers, params=params)
        response.raise_for_status()  # Raise an exception for any HTTP errors
        
        # parse the JSON response to extract artist data
        data = response.json()
        artists = data.get('artists', {}).get('items', [])

        # if no artist is found, log a warning and return an empty list
        if not artists:
            logging.warning(f"No artist found for {artist_name}")
            return []

        # get the genres associated with the first (most relevant) artist found
        artist_info = artists[0]
        genres = artist_info.get('genres', [])
        return genres

    except requests.exceptions.RequestException as e:
        # log any exceptions that occur during the API request
        logging.error(f"Error fetching data for {artist_name}: {e}")
        return []

In [None]:
# function to fetch genres for an artist with caching to avoid redundant API calls
def get_artist_genres_cached(artist_name, access_token):
    # check if the artist's genres are already in the cache
    if artist_name in genre_cache:
        logging.info(f"Cache hit for '{artist_name}'")
        return genre_cache[artist_name]
    
    # if not in the cache, fetch the genres from the Spotify API
    genres = get_artist_genres(artist_name, access_token)
    
    # store the fetched genres in the cache for future use
    genre_cache[artist_name] = genres
    logging.info(f"Fetched and cached genres for '{artist_name}': {genres}")
    
    return genres

In [None]:
# function to fetch genres for a list of artists in parallel, using caching and rate limiting
def fetch_genres_for_artists_parallel(df, access_token):
    # ensure the 'genres' column exists in the dataframe
    df['genres'] = None

    def get_genres_for_multiple_artists(artists_list):
        all_genres = set()
        for artist in artists_list:
            if isinstance(artist, str):  # ensure that artist is a string
                genres = get_artist_genres_cached(artist, access_token)
                all_genres.update(genres)  # add the fetched genres to the set
        return list(all_genres) if all_genres else []

    # create a thread pool to process artists in parallel
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        # map the future tasks to their respective dataframe indices
        futures = {executor.submit(get_genres_for_multiple_artists, row['artists']): idx for idx, row in df.iterrows()}
        for future in concurrent.futures.as_completed(futures):
            idx = futures[future]  # get the index of the dataframe row
            try:
                genres = future.result()  # get the result of the future task
                df.at[idx, 'genres'] = genres  # update the dataframe with the fetched genres
                logging.info(f"Updated genres for index {idx}: {genres}")
            except Exception as e:
                logging.error(f"Error processing index {idx}: {e}")
                df.at[idx, 'genres'] = []  # set genres to an empty list if an error occurs
    return df

In [None]:
# make sure each entry in the 'artists' column is a list
streams_df['artists'] = streams_df['artists'].apply(lambda x: x if isinstance(x, list) else [x])

# test with a small subset of the DataFrame
test_df_with_genres = fetch_genres_for_artists_parallel(streams_df, access_token)
print(test_df_with_genres.head())

In [None]:
# count the number of rows where the genres column is empty
empty_genre_count = streams_df['genres'].apply(lambda x: len(x) == 0).sum()

print(f"Number of rows with empty genres: {empty_genre_count}")

In [None]:
# impute the empty lists in the genre column with "unknown"

for i in range(len(streams_df)):
    if len(streams_df.at[i, 'genres']) == 0:
        streams_df.at[i, 'genres'] = ["Unknown"]

In [None]:
# count the empty lists again
empty_genre_count = streams_df['genres'].apply(lambda x: len(x) == 0).sum()

print(f"Number of rows with empty genres: {empty_genre_count}")

# empty lists have been imputated with "Unknown"

In [None]:
# verify imputation
print(streams_df['genres'].head())

In [None]:
# basic EDA
# check shape of dataframe
print(f"Dataset Shape: {streams_df.shape}")

In [None]:
streams_df.popularity.isnull().sum()

In [None]:
# calculate the range of the popularity feature
popularity_min = streams_df['popularity'].min()
popularity_max = streams_df['popularity'].max()

print(f"The range of popularity values is from {popularity_min} to {popularity_max}.")

In [None]:
# before proceeding with EDA, let's save the dataframe to a csv since the API genre fetch took nearly 3 hours!
streams_df.to_csv('streams_df_with_genres.csv', index=False)

In [None]:
# popularity does not have any missing values. The ID, name, artists, and explicit columns are of no 
# significance to this analysis, so we'll be dropping them

columns_drop = ['id', 'explicit','artists','name']
streams_df_clean = streams_df.drop(columns=columns_drop)
streams_df_clean.head()

In [None]:
# no nulls exist across all features. Compute the data range for the year feature

streams_df_clean.year.min(), streams_df_clean.year.max(), streams_df_clean.year.max() - streams_df_clean.year.min()

# 99 years of data exists, so the granularity of the time series can be in years. day/month granularity is 
# unnecessary since our goal is to forecast genre popularity over the next few decades
# therefore the release date column can be dropped

In [None]:
streams_df_clean_2 = streams_df_clean.drop(columns = 'release_date')
streams_df_clean_2.head()