In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import requests
import io
import os
import base64
import time
from dotenv import load_dotenv
from collections import OrderedDict
from datetime import date, timedelta

In [None]:
start_date = date(2022, 1, 1)
end_date = date(2024, 8, 8)

current_date = start_date
billboard_dfs = []
while(current_date <= end_date):
    current_csv = f"{current_date.strftime('%Y-%m-%d')}.csv"
    billboard_data = requests.get(f"https://raw.githubusercontent.com/utdata/rwd-billboard-data/main/data-scraped/hot-100/{current_date.year}/{current_csv}").content
    print(f"Fetched {current_csv}")
    
    current_billboard_df = pd.read_csv(io.StringIO(billboard_data.decode("utf-8"))).loc[:9]
    billboard_dfs.append(current_billboard_df)
    current_date += timedelta(days=7)

In [None]:
billboard_df = pd.concat(billboard_dfs).reset_index()[["chart_week", "current_week", "title", "performer"]]
billboard_df["chart_week"] = pd.to_datetime(billboard_df["chart_week"])
billboard_df

In [None]:
load_dotenv()
client_id = os.getenv("SPOTIFY_CLIENT_ID")
client_secret = os.getenv("SPOTIFY_CLIENT_SECRET")

credentials = f"{client_id}:{client_secret}"
base64_credentials = base64.b64encode(credentials.encode()).decode()

auth_options = {
    "headers": {"Authorization": f"Basic {base64_credentials}"},
    "data": {"grant_type": "client_credentials"}
}
auth_response = requests.post("https://accounts.spotify.com/api/token", headers=auth_options["headers"], data=auth_options["data"]).json()
access_token = auth_response["access_token"]
access_token

In [None]:
def parse_artist(artist):
    return artist.split("X")[0].split("&")[0].split(",")[0].split("Featuring")[0].strip()

def parse_song_name(name):
    if "*" in name:
        words = name.split(" ")
        replaced_words = []
        for word in words:
            if "*" in word:
                new_word = word[0] + "*" * (len(word) - 1)
                replaced_words.append(new_word)
            else:
                replaced_words.append(word)
        return " ".join(replaced_words)
    return name

In [None]:
track_info = OrderedDict()
def find_track_id(song):
    name = parse_song_name(song["title"])
    artist = parse_artist(song["performer"])

    if((name, artist) not in track_info):
        while True:
            search_response = requests.get(
                "https://api.spotify.com/v1/search",
                headers={"Authorization": f"Bearer {access_token}"},
                params={
                    "q":  f"{name} {artist}",
                    "type": "track",
                    "limit": 1
                }
            )
            if search_response.status_code == 429:
                print("Too many requests. Retrying in 30 seconds...")
                time.sleep(30)
            else:
                try:
                    track_id = search_response.json()["tracks"]["items"][0]["id"]
                    track_info[(name, artist)] = {"id": track_id}
                    print(f"Id for '{name}' by '{artist}' is {track_id}")
                except:
                    print(f"Error in '{name}' by '{artist}'")
                    print(search_response.json())
                break

billboard_df.apply(find_track_id, axis=1)
print(f"Total number of track ids found: {len(track_info)}")

In [None]:
track_ids = [x["id"] for x in track_info.values()]
audio_features = []
for i in range(0, len(track_ids), 100):
    chunk = track_ids[i:i + 100]

    while True:
        audio_features_response = requests.get(
            "https://api.spotify.com/v1/audio-features",
            headers={"Authorization": f"Bearer {access_token}"},
            params={"ids": ",".join(chunk)}
        )
        if audio_features_response.status_code == 429:
            print("Too many requests. Retrying in 30 seconds...")
            time.sleep(30)
        else:
            print(f"Fetched audio features for tracks {i+1} to {i+len(chunk)}")
            audio_features += audio_features_response.json()["audio_features"]
            break

for i, k in enumerate(track_info.keys()):
    track_info[k] = audio_features[i]

print(f"Features found: {', '.join(next(iter(track_info.values())).keys())}")

In [None]:
def add_feature(song, feature):
    name = parse_song_name(song["title"])
    artist = parse_artist(song["performer"])
    return track_info[(name, artist)][feature] if (name, artist) in track_info else None

billboard_df["danceability"] = billboard_df.apply(add_feature, args=("danceability",), axis=1)
billboard_df["energy"] = billboard_df.apply(add_feature, args=("energy",), axis=1)
billboard_df["loudness"] = billboard_df.apply(add_feature, args=("loudness",), axis=1)
billboard_df["speechiness"] = billboard_df.apply(add_feature, args=("speechiness",), axis=1)
billboard_df["acousticness"] = billboard_df.apply(add_feature, args=("acousticness",), axis=1)
billboard_df["liveness"] = billboard_df.apply(add_feature, args=("liveness",), axis=1)
billboard_df["tempo"] = billboard_df.apply(add_feature, args=("tempo",), axis=1)

billboard_df

In [None]:
billboard_df.to_csv("billboard.csv", index=False) 

In [None]:
billboard_fig = plt.figure(figsize=(30, 10))
for i in range(1, 11):
    df = billboard_df.loc[billboard_df["current_week"] == i].set_index("chart_week")
    df["danceability"].plot(label=i, ylabel="danceability")
    
plt.legend()
plt.show()

In [None]:
gpr_data = requests.get("https://www.matteoiacoviello.com/gpr_files/data_gpr_daily_recent.xls").content

gpr_df = pd.DataFrame(pd.read_excel(io.BytesIO(gpr_data), index_col="date", parse_dates=True)) 
gpr_df = gpr_df.loc["2022-01-01":][["GPRD", "GPRD_THREAT", "GPRD_ACT"]]
gpr_df

In [None]:
gpr_fig = plt.figure(figsize=(30, 10))
gpr_df["GPRD"].plot()
gpr_df["GPRD_THREAT"].plot()
gpr_df["GPRD_ACT"].plot()

plt.legend()
plt.show()