In [1]:
!pip install billboard.py lyricsgenius pandas numpy



In [2]:
!pip uninstall billboard.py -y
!pip install billboard.py
!pip show billboard.py

Found existing installation: billboard.py 7.1.0
Uninstalling billboard.py-7.1.0:
  Successfully uninstalled billboard.py-7.1.0
Collecting billboard.py
  Using cached billboard.py-7.1.0-py2.py3-none-any.whl (10 kB)
Installing collected packages: billboard.py
Successfully installed billboard.py-7.1.0
Name: billboard.py
Version: 7.1.0
Summary: Python API for downloading Billboard charts
Home-page: https://github.com/guoguo12/billboard-charts
Author: Allen Guo
Author-email: guoguo12@gmail.com
License: MIT License
Location: /Users/varun/miniconda3/envs/tf/lib/python3.9/site-packages
Requires: beautifulsoup4, requests
Required-by: 


In [3]:
!pip install spotipy python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.1.0-py3-none-any.whl (20 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.1.0


In [4]:
import billboard
import spotipy 
from spotipy.oauth2 import SpotifyClientCredentials
import pandas as pd
import numpy as np
import time
import datetime
import os
import re
from dotenv import load_dotenv
import os

load_dotenv()

spotify_CLIENT_ID = os.getenv("spotify_CLIENT_ID")
spotify_CLIENT_SECRET = ("spotify_CLIENT_SECRET")

CHART_NAME = 'hot-100' 
OUTPUT_CSV_FILE = f'{CHART_NAME}_spotify_filtered_bigger_artists.csv' 
NUMBER_OF_WEEKS_TO_FETCH = 1
MAX_SONGS_PER_CHART = 2
FETCH_SPOTIFY_DATA = True 

BILLBOARD_DELAY = 2 
SPOTIFY_DELAY = 0.5 

ARTIST_SCORE_THRESHOLD_PERCENTILE = 0.5 # Keep top 50% artists by score
W_PEAK = 1.0        # Importance of peak position
W_LONGEVITY = 10.0  # Importance of longevity (log scaled)
B_TOP10 = 50        # Bonus for Top 10
B_NUM1 = 150        # Bonus for #1

TARGET_GENRE_KEYWORD = 'pop' 
# Set to True to enable filtering based on Spotify's artist/album genres
PERFORM_GENRE_FILTER = False # Keep False initially, enable after verifying genre data

# --- Initialize Spotify API ---
sp = None # Initialize sp to None
if FETCH_SPOTIFY_DATA:
    if spotify_CLIENT_ID == 'YOUR_SPOTIFY_CLIENT_ID' or spotify_CLIENT_SECRET == 'YOUR_SPOTIFY_CLIENT_SECRET':
        raise ValueError("CRITICAL: Set your spotify_CLIENT_ID and spotify_CLIENT_SECRET.")
    try:
        print("Initializing Spotify API...")
        client_credentials_manager = SpotifyClientCredentials(client_id=spotify_CLIENT_ID, client_secret=spotify_CLIENT_SECRET)
        sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
        # Make a test call to ensure authentication works
        sp.search(q='test', limit=1)
        print("Spotify API initialized successfully.")
    except Exception as e:
        print(f"FATAL ERROR: Could not initialize Spotify API: {e}")
        print("Spotify data fetching will be disabled.")
        FETCH_SPOTIFY_DATA = False

# --- Helper Functions ---

def get_chart_dates(num_weeks):
    """Generates a list of PAST Saturday dates in 'YYYY-MM-DD' format."""
    # (Code unchanged from previous correct version)
    dates = []
    today = datetime.date.today()
    current_saturday = today - datetime.timedelta(days=(today.weekday() + 2) % 7)
    print(f"Starting date generation from most recent Saturday: {current_saturday}")
    for i in range(num_weeks):
        chart_date_dt = current_saturday - datetime.timedelta(weeks=i)
        chart_date_str = chart_date_dt.strftime('%Y-%m-%d')
        dates.append(chart_date_str)
    print(f"Generated {len(dates)} chart dates, from {dates[0]} back to {dates[-1]}.")
    return dates

def clean_search_query(text):
    """Removes common variations like 'feat.', 'Remix', etc., for better API matching."""
    # (Code unchanged from previous correct version)
    if not isinstance(text, str):
        return ""
    text = re.sub(r'\sfeat\.?\s.*', '', text, flags=re.IGNORECASE)
    text = re.sub(r'\sx\s.*', '', text, flags=re.IGNORECASE)
    text = re.sub(r'\s&\s.*', '', text, flags=re.IGNORECASE)
    text = re.sub(r'\swith\s.*', '', text, flags=re.IGNORECASE)
    text = re.sub(r'\(.*Remix.*\)', '', text, flags=re.IGNORECASE)
    text = re.sub(r'\(.*Radio Edit.*\)', '', text, flags=re.IGNORECASE)
    text = re.sub(r'\s*\(.*\)', '', text)
    return text.strip()

def parse_release_date(date_str, precision):
    """Parses Spotify release date based on precision."""
    try:
        if precision == 'day':
            return pd.to_datetime(date_str, format='%Y-%m-%d')
        elif precision == 'month':
            return pd.to_datetime(date_str + '-01', format='%Y-%m-%d') # Assume 1st of month
        elif precision == 'year':
            return pd.to_datetime(date_str + '-01-01', format='%Y-%d-%m') # Assume Jan 1st
        else:
            return pd.NaT # Not a Time for unknown precision
    except ValueError:
        return pd.NaT # Handle parsing errors

def fetch_spotify_track_data(sp_client, artist_name, track_title):
    """Fetches metadata for a track from Spotify API."""
    if not FETCH_SPOTIFY_DATA or sp_client is None:
        return None # Skip if disabled or client not initialized

    # Clean inputs for better search
    search_artist = clean_search_query(artist_name).split(',')[0].split('&')[0].strip() # Primary artist
    search_track = clean_search_query(track_title)
    query = f'artist:"{search_artist}" track:"{search_track}"' # Use quotes for more exact match

    # print(f"    Searching Spotify: {query}") # Uncomment for debug logs
    try:
        results = sp_client.search(q=query, type='track', limit=1)
        time.sleep(SPOTIFY_DELAY) # Pause after API call

        if results and results['tracks']['items']:
            track = results['tracks']['items'][0]
            album = track['album'] # Album info associated with the track

            # Basic track info
            spotify_data = {
                'spotify_id': track.get('id'),
                'spotify_track_name': track.get('name'),
                'spotify_primary_artist': track['artists'][0].get('name') if track.get('artists') else None,
                'spotify_all_artists': ', '.join([a.get('name', '') for a in track.get('artists', [])]),
                'spotify_track_url': track['external_urls'].get('spotify'),
                'popularity': track.get('popularity'),
                'duration_ms': track.get('duration_ms'),
                'explicit': track.get('explicit'),

                # Album info from the track object
                'album_id': album.get('id'),
                'album_name': album.get('name'),
                'album_release_date': parse_release_date(album.get('release_date'), album.get('release_date_precision')),
                'album_total_tracks': album.get('total_tracks'),
                'album_url': album['external_urls'].get('spotify'),
                'album_genres': ', '.join(album.get('genres', [])), # Genres often on album
                'album_label': album.get('label'), # Sometimes label info is here

                # Artist genres (often more reliable than album genres)
                'artist_genres': None, # Placeholder, fetch below

                # Placeholders for harder-to-get credits
                'writers': None, # Often requires external linking or parsing copyrights
                'producers': None, # Often requires external linking or parsing copyrights
                'album_copyrights': None # Fetch below
            }

            # --- Fetch Additional Details (Optional, adds API calls) ---
            # Get Artist Genres (more reliable source)
            if track.get('artists'):
                try:
                    artist_id = track['artists'][0].get('id')
                    if artist_id:
                        artist_details = sp_client.artist(artist_id)
                        time.sleep(SPOTIFY_DELAY)
                        spotify_data['artist_genres'] = ', '.join(artist_details.get('genres', []))
                except Exception as e_artist:
                    print(f"      WARN: Could not fetch artist details for {artist_id}: {e_artist}")

            # Get Album Copyrights (might contain writer/producer info textually)
            if spotify_data['album_id']:
                 try:
                     album_details = sp_client.album(spotify_data['album_id'])
                     time.sleep(SPOTIFY_DELAY)
                     spotify_data['album_copyrights'] = ', '.join([c.get('text', '') for c in album_details.get('copyrights', [])])
                 except Exception as e_album:
                     print(f"      WARN: Could not fetch album details for {spotify_data['album_id']}: {e_album}")

            # print(f"    Found on Spotify: {spotify_data['spotify_track_name']} by {spotify_data['spotify_primary_artist']}") # Uncomment for debug
            return spotify_data
        else:
            # print(f"    Not found on Spotify.") # Uncomment for debug
            return None
    except Exception as e:
        print(f"    WARNING: Error fetching Spotify data for '{track_title}' by '{artist_name}': {e}")
        time.sleep(SPOTIFY_DELAY * 2) # Longer pause after error
        return None

# --- Main Data Collection ---
print(f"\n--- Starting Data Collection for {CHART_NAME} ---")
all_chart_entries = []
chart_dates = get_chart_dates(NUMBER_OF_WEEKS_TO_FETCH)

# Loop through each calculated chart date
for i, chart_date in enumerate(chart_dates):
    print(f"\nFetching Billboard chart {i+1}/{len(chart_dates)} for date: {chart_date}")
    try:
        # Fetch chart data for the specific date
        chart = billboard.ChartData(CHART_NAME, date=chart_date) # Use billboard.Chart
        print(f"  Successfully fetched Billboard chart with {len(chart)} entries.")
        time.sleep(BILLBOARD_DELAY) # Pause between Billboard fetches

        # Process each song entry on the chart for that week
        for entry_rank, entry in enumerate(chart[:MAX_SONGS_PER_CHART]):
            print(f"  Processing Billboard Entry #{entry_rank+1}: '{entry.title}' by {entry.artist}")
            # Basic song data from Billboard
            billboard_info = {
                'chart_date': chart_date, # String 'YYYY-MM-DD'
                'chart_name': CHART_NAME,
                'rank': entry.rank,
                'title': entry.title,
                'artist': entry.artist, # Original artist string from Billboard
                'peak_pos': entry.peakPos,
                'last_pos': entry.lastPos,
                'weeks_on_chart': entry.weeks,
                'is_new': entry.isNew,
            }

            # Attempt to fetch complementary data from Spotify
            spotify_data = fetch_spotify_track_data(sp, entry.artist, entry.title)

            # Combine Billboard data with Spotify data (if found)
            if spotify_data:
                billboard_info.update(spotify_data)
            else:
                # Add placeholder keys if Spotify data wasn't found or fetch failed
                spotify_placeholders = {
                    'spotify_id': None, 'spotify_track_name': None, 'spotify_primary_artist': None,
                    'spotify_all_artists': None, 'spotify_track_url': None, 'popularity': None,
                    'duration_ms': None, 'explicit': None, 'album_id': None, 'album_name': None,
                    'album_release_date': None, 'album_total_tracks': None, 'album_url': None,
                    'album_genres': None, 'artist_genres': None, 'album_label': None, 'writers': None,
                    'producers': None, 'album_copyrights': None
                }
                billboard_info.update(spotify_placeholders)

            # Add the combined entry to our master list
            all_chart_entries.append(billboard_info)

    except Exception as e:
        print(f"  WARNING: Could not fetch or process Billboard chart for {chart_date}. Error: {e}")
        print(f"  Skipping this date.")
        time.sleep(BILLBOARD_DELAY * 2)
        continue # Skip to the next date

print("\n--- Data Collection Finished ---")

# --- Data Processing and Feature Engineering ---

if not all_chart_entries:
    print("FATAL: No data was collected (likely due to chart fetching errors). Exiting.")
    exit()

print("\n--- Processing Collected Data ---")
df = pd.DataFrame(all_chart_entries)
print(f"Created initial DataFrame with {len(df)} rows and {len(df.columns)} columns.")
# print("Initial DataFrame columns:", df.columns.tolist()) # Uncomment to debug columns

# --- Basic Cleaning & Type Conversion ---
print("Cleaning data and converting types...")
# Convert date columns (chart_date is string, album_release_date should be datetime)
if 'chart_date' in df.columns:
    df['chart_date'] = pd.to_datetime(df['chart_date'], errors='coerce')
if 'album_release_date' in df.columns:
    # Already parsed to datetime or NaT in fetch function
    pass # df['album_release_date'] = pd.to_datetime(df['album_release_date'], errors='coerce')

# Convert numerical columns
numeric_cols = ['rank', 'peak_pos', 'last_pos', 'weeks_on_chart', 'popularity', 'duration_ms', 'album_total_tracks']
print("Converting numeric columns:", numeric_cols)
for col in numeric_cols:
    if col in df.columns:
        # Handle potential non-numeric placeholders before conversion
        df[col] = pd.to_numeric(df[col], errors='coerce')
        nan_count = df[col].isna().sum()
        if nan_count > 0 and nan_count < len(df): # Don't warn if column is entirely NaN
             print(f"  INFO: Column '{col}' has {nan_count} NaN values after conversion.")
    else:
        print(f"  WARNING: Numeric column '{col}' not found. Adding as empty.")
        df[col] = np.nan

# Convert boolean columns
for col in ['is_new', 'explicit']:
    if col in df.columns:
        df[col] = df[col].astype('boolean') # Use Pandas nullable boolean type
    else:
        print(f"  WARNING: Boolean column '{col}' not found. Adding as False.")
        df[col] = False


# --- Calculate Artist Success Score ---
# Uses Billboard 'artist' string and chart data. Logic remains the same.
print("\nCalculating Artist Success Score (based on Billboard data)...")
# (Function definition calculate_artist_success unchanged from previous version)
def calculate_artist_success(df_calc, w_peak, w_longevity, b_top10, b_num1):
    required_cols = ['artist', 'title', 'peak_pos', 'weeks_on_chart']
    if not all(col in df_calc.columns for col in required_cols):
        print("  WARNING: Missing required columns for score calculation:", [col for col in required_cols if col not in df_calc.columns])
        return pd.Series(dtype=float)
    print("  Aggregating best performance for each unique song by artist (Billboard data)...")
    df_calc_cleaned = df_calc.dropna(subset=required_cols)
    if df_calc_cleaned.empty:
        print("  WARNING: No valid data remaining after cleaning for score calculation.")
        return pd.Series(dtype=float)
    try:
        song_summary = df_calc_cleaned.groupby(['artist', 'title'], observed=True).agg(
            song_peak_pos=('peak_pos', 'min'),
            song_weeks_on_chart=('weeks_on_chart', 'max')
        ).reset_index()
    except Exception as e:
        print(f"  ERROR during groupby/aggregation for score calculation: {e}")
        return pd.Series(dtype=float)
    print(f"  Aggregated performance for {len(song_summary)} unique artist-song combinations.")
    song_summary['peak_score'] = w_peak * (101 - song_summary['song_peak_pos'])
    song_summary['longevity_score'] = w_longevity * np.log1p(song_summary['song_weeks_on_chart'])
    song_summary['top10_bonus'] = np.where(song_summary['song_peak_pos'] <= 10, b_top10, 0)
    song_summary['num1_bonus'] = np.where(song_summary['song_peak_pos'] == 1, b_num1, 0)
    song_summary['total_song_score'] = (song_summary['peak_score'] + song_summary['longevity_score'] +
                                        song_summary['top10_bonus'] + song_summary['num1_bonus'])
    print("  Summing song scores to get final artist scores...")
    artist_scores = song_summary.groupby('artist', observed=True)['total_song_score'].sum()
    return artist_scores.sort_values(ascending=False)

# Calculate scores using the Billboard 'artist' column
artist_scores = calculate_artist_success(df.copy(), W_PEAK, W_LONGEVITY, B_TOP10, B_NUM1)


# --- Filter by Artist Score ---
# Logic unchanged, filters based on the calculated scores
if artist_scores.empty or artist_scores.isna().all():
    print("\nWARNING: Artist scores could not be calculated. Skipping filtering by score.")
    df_filtered = df.copy()
else:
    # (Filtering logic based on ARTIST_SCORE_THRESHOLD_PERCENTILE is unchanged)
    print(f"\nCalculated success scores for {len(artist_scores)} artists (Billboard names).")
    print("Top 10 Artists by Calculated Success Score:")
    print(artist_scores.head(10))
    print(f"\nFiltering for 'Bigger Artists' (Top {(1-ARTIST_SCORE_THRESHOLD_PERCENTILE)*100:.0f}% by score)...")
    if ARTIST_SCORE_THRESHOLD_PERCENTILE > 0.0 and ARTIST_SCORE_THRESHOLD_PERCENTILE < 1.0 :
        score_threshold = artist_scores.quantile(1 - ARTIST_SCORE_THRESHOLD_PERCENTILE)
        bigger_artists_list = artist_scores[artist_scores >= score_threshold].index.tolist()
        print(f"  Score threshold: {score_threshold:.2f}")
        print(f"  Identified {len(bigger_artists_list)} artists meeting the threshold.")
        if 'artist' in df.columns:
            df_filtered = df[df['artist'].isin(bigger_artists_list)].copy()
            print(f"  DataFrame filtered to {len(df_filtered)} rows for bigger artists.")
        else:
            print("  WARNING: 'artist' column missing, cannot filter by bigger artists score. Keeping all data.")
            df_filtered = df.copy()
    elif ARTIST_SCORE_THRESHOLD_PERCENTILE >= 1.0:
         print(f"  Threshold percentile ({ARTIST_SCORE_THRESHOLD_PERCENTILE}) >= 100%, keeping no artists based on this filter.")
         df_filtered = df.iloc[0:0].copy()
    else: # <= 0.0
        print("  Threshold percentile <= 0, keeping all artists.")
        df_filtered = df.copy()

# --- Add Artist Success Score as a Column ---
# Logic unchanged, maps score based on Billboard 'artist' name
if 'artist' in df_filtered.columns:
    if not artist_scores.empty:
        print("Adding Artist Success Score column to the DataFrame...")
        df_filtered['artist_success_score'] = df_filtered['artist'].map(artist_scores)
        df_filtered['artist_success_score'].fillna(0, inplace=True)
    else:
        print("Artist scores are empty, adding 'artist_success_score' column with default 0.")
        df_filtered['artist_success_score'] = 0
else:
    print("WARNING: 'artist' column missing in df_filtered. Cannot add 'artist_success_score'.")
    df_filtered['artist_success_score'] = 0


# --- Genre Filtering (Using Spotify Data) ---
print(f"\nGenre Filtering Section (Target Keyword: '{TARGET_GENRE_KEYWORD}')")
if PERFORM_GENRE_FILTER:
    print(f"Attempting to filter based on Spotify 'artist_genres' or 'album_genres'...")
    # Check if either genre column exists
    artist_genre_col = 'artist_genres' if 'artist_genres' in df_filtered.columns else None
    album_genre_col = 'album_genres' if 'album_genres' in df_filtered.columns else None

    if artist_genre_col or album_genre_col:
        original_row_count = len(df_filtered)
        # Create a boolean mask: True if keyword found in either artist or album genres
        mask = pd.Series(False, index=df_filtered.index) # Start with all False
        if artist_genre_col:
            # Ensure string, handle NaN, search case-insensitively
            mask |= df_filtered[artist_genre_col].astype(str).fillna('').str.contains(TARGET_GENRE_KEYWORD, case=False, na=False)
        if album_genre_col:
            mask |= df_filtered[album_genre_col].astype(str).fillna('').str.contains(TARGET_GENRE_KEYWORD, case=False, na=False)

        df_filtered = df_filtered[mask]
        print(f"  Filtered from {original_row_count} to {len(df_filtered)} rows based on genre keyword '{TARGET_GENRE_KEYWORD}'.")
        if len(df_filtered) == 0:
            print(f"  WARNING: Genre filter resulted in zero rows. Check keyword or Spotify genre data availability.")
    else:
        print(f"  WARNING: Neither 'artist_genres' nor 'album_genres' column found. Skipping genre filter.")
else:
    print("  Genre filtering is currently disabled (PERFORM_GENRE_FILTER = False).")


# --- Final Data Preparation & Output ---
print("\n--- Preparing Final Data and Saving ---")

# Define the desired order of columns for the final CSV output
final_columns = [
    # Billboard core
    'chart_date', 'chart_name', 'rank', 'title', 'artist',
    'artist_success_score',
    'peak_pos', 'last_pos', 'weeks_on_chart', 'is_new',
    # Spotify track specific
    'spotify_id', 'spotify_track_name', 'spotify_primary_artist', 'spotify_all_artists',
    'spotify_track_url', 'popularity', 'duration_ms', 'explicit',
    # Spotify album specific
    'album_id', 'album_name', 'album_release_date', 'album_total_tracks', 'album_url',
    'artist_genres', 'album_genres', 'album_label', 'album_copyrights',
    # Placeholders / Hard to get consistently
    'writers', 'producers',
]

# Ensure all desired columns exist, adding any missing ones with None/NaN
print("Ensuring all final columns exist...")
for col in final_columns:
    if col not in df_filtered.columns:
        print(f"  Adding missing final column: '{col}'")
        df_filtered[col] = None # Add missing column with None/NaN

# Reorder DataFrame columns using only columns that actually exist
existing_final_columns = [col for col in final_columns if col in df_filtered.columns]
df_final = df_filtered[existing_final_columns]

# Display some info about the final dataset
print(f"\nFinal processed dataset contains {len(df_final)} rows and {len(df_final.columns)} columns.")
# (Messages about filtering status remain similar)


# Save the final processed DataFrame to a CSV file
if df_final.empty:
    print("\nWARNING: Final DataFrame is empty. Skipping save to CSV.")
else:
    try:
        df_final.to_csv(OUTPUT_CSV_FILE, index=False, encoding='utf-8')
        print(f"\nSuccessfully saved final filtered data to: {OUTPUT_CSV_FILE}")
    except Exception as e:
        print(f"\nERROR: Failed to save data to CSV file. Error: {e}")

print("\n--- Script Execution Complete ---")

Initializing Spotify API...
FATAL ERROR: Could not initialize Spotify API: error: invalid_client, error_description: Invalid client secret
Spotify data fetching will be disabled.

--- Starting Data Collection for hot-100 ---
Starting date generation from most recent Saturday: 2025-03-29
Generated 1 chart dates, from 2025-03-29 back to 2025-03-29.

Fetching Billboard chart 1/1 for date: 2025-03-29
  Successfully fetched Billboard chart with 100 entries.
  Processing Billboard Entry #1: 'Luther' by Kendrick Lamar & SZA
  Processing Billboard Entry #2: 'Evil J0rdan' by Playboi Carti

--- Data Collection Finished ---

--- Processing Collected Data ---
Created initial DataFrame with 2 rows and 28 columns.
Cleaning data and converting types...
Converting numeric columns: ['rank', 'peak_pos', 'last_pos', 'weeks_on_chart', 'popularity', 'duration_ms', 'album_total_tracks']

Calculating Artist Success Score (based on Billboard data)...
  Aggregating best performance for each unique song by arti

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df_filtered['artist_success_score'].fillna(0, inplace=True)
