In [5]:
import requests
import random
import string
import base64
import os
import csv
import time

# Spotify API credentials
client_id = '13c533aa9722416793be9e5df13fa0ec'
client_secret = '0bcd029bae3f4ad19efa068b2c3ea733'

# File path to store artist data
file_path = "spotify_artists.csv"

# Step 1: Obtain an Access Token
def get_access_token(client_id, client_secret):
    token_url = 'https://accounts.spotify.com/api/token'
    client_creds = f"{client_id}:{client_secret}"
    client_creds_b64 = base64.b64encode(client_creds.encode()).decode()
    
    headers = {'Authorization': f'Basic {client_creds_b64}'}
    data = {'grant_type': 'client_credentials'}
    
    response = requests.post(token_url, headers=headers, data=data)
    if response.status_code == 200:
        print("Successfully obtained access token.")
        return response.json()['access_token']
    else:
        raise Exception(f"Failed to get access token: {response.status_code} - {response.text}")

# Step 2: Search Spotify for random artist IDs
def get_random_artist_ids(access_token, limit=50):
    url = "https://api.spotify.com/v1/search"
    headers = {'Authorization': f'Bearer {access_token}'}
    params = {
        'q': ''.join(random.choice(string.ascii_lowercase) for _ in range(3)),
        'type': 'artist',
        'limit': limit
    }
    
    response = requests.get(url, headers=headers, params=params)
    if response.status_code == 200:
        artist_items = response.json()['artists']['items']
        artist_ids = [artist['id'] for artist in artist_items]
        print(f"Retrieved {len(artist_ids)} artist IDs.")
        return artist_ids
    elif response.status_code == 429:  # Rate limit
        retry_after = int(response.headers.get("Retry-After", 5))
        print(f"Rate limit hit. Retrying in {retry_after} seconds.")
        time.sleep(retry_after)
        return get_random_artist_ids(access_token, limit)
    else:
        print(f"Failed to search for artist IDs: {response.status_code} - {response.text}")
        return []

# Step 3: Get Artist Details for a List of IDs
def get_artists_details(access_token, artist_ids):
    url = "https://api.spotify.com/v1/artists"
    headers = {'Authorization': f'Bearer {access_token}'}
    params = {'ids': ','.join(artist_ids)}
    
    response = requests.get(url, headers=headers, params=params)
    if response.status_code == 200:
        return response.json().get('artists', [])
    elif response.status_code == 429:  # Rate limit
        retry_after = int(response.headers.get("Retry-After", 5))
        print(f"Rate limit hit. Retrying in {retry_after} seconds.")
        time.sleep(retry_after)
        return get_artists_details(access_token, artist_ids)
    else:
        print(f"Failed to retrieve artist details: {response.status_code} - {response.text}")
        return []

# Step 4: Write artist details to CSV
def write_artists_to_csv(artists):
    file_exists = os.path.isfile(file_path)
    with open(file_path, mode='a', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        if not file_exists:
            writer.writerow(['id', 'name', 'popularity', 'followers', 'genres', 'url'])  # CSV headers
        for artist in artists:
            genres = ", ".join(artist.get('genres', []))
            writer.writerow([
                artist['id'],
                artist['name'],
                artist['popularity'],
                artist['followers']['total'],
                genres,
                artist['external_urls']['spotify']
            ])
    print(f"Wrote {len(artists)} artists to CSV.")

# Step 5: Main Function to Collect 100,000 Artist Names
def collect_artists(access_token, target_count=100000):
    artist_ids = set()  # Use a set to avoid duplicate IDs
    total_collected = 0
    
    while total_collected < target_count:
        # Step 5.1: Gather unique artist IDs until we have enough for a batch request
        while len(artist_ids) < 50:
            new_ids = get_random_artist_ids(access_token, limit=50 - len(artist_ids))
            artist_ids.update(new_ids)
        
        # Step 5.2: Get artist details in batches of up to 50
        batch_ids = list(artist_ids)[:50]
        artist_details = get_artists_details(access_token, batch_ids)
        write_artists_to_csv(artist_details)
        
        # Update count and clear processed IDs
        total_collected += len(artist_details)
        artist_ids.difference_update(batch_ids)  # Remove processed IDs
        
        print(f"Total artists collected so far: {total_collected}")
        if total_collected >= target_count:
            break

# Run the collection process
if __name__ == "__main__":
    access_token = get_access_token(client_id, client_secret)
    collect_artists(access_token, target_count=100000)


Successfully obtained access token.
Retrieved 1 artist IDs.
Retrieved 47 artist IDs.
Retrieved 2 artist IDs.
Wrote 50 artists to CSV.
Total artists collected so far: 50
Retrieved 50 artist IDs.
Wrote 50 artists to CSV.
Total artists collected so far: 100
Retrieved 50 artist IDs.
Wrote 50 artists to CSV.
Total artists collected so far: 150
Retrieved 50 artist IDs.
Wrote 50 artists to CSV.
Total artists collected so far: 200
Retrieved 50 artist IDs.
Wrote 50 artists to CSV.
Total artists collected so far: 250
Retrieved 50 artist IDs.
Wrote 50 artists to CSV.
Total artists collected so far: 300
Retrieved 50 artist IDs.
Wrote 50 artists to CSV.
Total artists collected so far: 350
Retrieved 8 artist IDs.
Retrieved 42 artist IDs.
Wrote 50 artists to CSV.
Total artists collected so far: 400
Retrieved 50 artist IDs.
Wrote 50 artists to CSV.
Total artists collected so far: 450
Retrieved 16 artist IDs.
Retrieved 34 artist IDs.
Wrote 50 artists to CSV.
Total artists collected so far: 500
Retrieve

In [None]:
import requests
import base64
import os
import csv
import time
from dotenv import load_dotenv, find_dotenv

# Load environment variables from .env file
load_dotenv(find_dotenv('config/dev.env'))

# Spotify API credentials from environment variables
client_id = os.getenv("SPOTIFY_CLIENT_ID")
client_secret = os.getenv("SPOTIFY_CLIENT_SECRET")

# Check that credentials are loaded
if not client_id or not client_secret:
    raise EnvironmentError("Failed to load Spotify credentials from the .env file.")

# File paths
artist_file_path = "spotify_artists.csv"
track_file_path = "spotify_top_tracks.csv"
audio_features_file_path = "spotify_audio_features.csv"

# Global variables to track access token and token refresh time
access_token = None
token_expiration_time = None

# Step 1: Obtain an Access Token and update expiration time
def get_access_token(client_id, client_secret):
    global access_token, token_expiration_time
    
    token_url = 'https://accounts.spotify.com/api/token'
    client_creds = f"{client_id}:{client_secret}"
    client_creds_b64 = base64.b64encode(client_creds.encode()).decode()
    
    headers = {'Authorization': f'Basic {client_creds_b64}'}
    data = {'grant_type': 'client_credentials'}
    
    response = requests.post(token_url, headers=headers, data=data)
    if response.status_code == 200:
        print("Successfully obtained access token.")
        access_token = response.json()['access_token']
        token_expiration_time = time.time() + 3600  # Token expires in 1 hour
    else:
        raise Exception(f"Failed to get access token: {response.status_code} - {response.text}")

# Step 2: Check if the token is expired and refresh if necessary
def check_and_refresh_token():
    global access_token
    if not access_token or time.time() >= token_expiration_time:
        get_access_token(client_id, client_secret)

# Step 3: Read unique artist IDs from the CSV file
def read_artist_ids(file_path):
    artist_ids = set()
    with open(file_path, mode='r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            artist_ids.add(row['id'])
    print(f"Loaded {len(artist_ids)} unique artist IDs from file.")
    return artist_ids

# Step 4: Get Top Tracks for Multiple Artist IDs in Batch
def get_top_tracks_batch(artist_ids, country='US', limit=5):
    check_and_refresh_token()  # Ensure token is valid
    top_tracks_data = []
    headers = {'Authorization': f'Bearer {access_token}'}

    # Load existing artist IDs from track file to avoid re-fetching
    existing_artist_ids = set()
    if os.path.isfile(track_file_path):
        with open(track_file_path, mode='r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                existing_artist_ids.add(row['artist_id'])
    
    # Filter out existing artist IDs
    artist_ids = [artist_id for artist_id in artist_ids if artist_id not in existing_artist_ids]
    
    start_time = time.time()  # Track the start time for pauses
    for artist_id in artist_ids:
        check_and_refresh_token()  # Check again before each request

        url = f"https://api.spotify.com/v1/artists/{artist_id}/top-tracks"
        params = {'market': country}
        
        response = requests.get(url, headers=headers, params=params)
        time.sleep(1)  # 1-second delay after each request
        
        if response.status_code == 200:
            top_tracks = response.json()['tracks'][:limit]
            for track in top_tracks:
                top_tracks_data.append({
                    'artist_id': artist_id,
                    'track_id': track['id'],
                    'track_name': track['name'],
                    'popularity': track['popularity'],
                    'album_name': track['album']['name'],
                    'release_date': track['album']['release_date'],
                    'external_url': track['external_urls']['spotify']
                })
            # Write to CSV after each batch
            write_tracks_to_csv(top_tracks_data)
            top_tracks_data = []  # Clear after writing
            
            # Check if 30 seconds has passed
            if time.time() - start_time >= 30:
                print("Pausing for 30 seconds to avoid rate limits.")
                time.sleep(30)
                check_and_refresh_token() 
                start_time = time.time()  # Reset the timer after the pause
        elif response.status_code == 429:  # Rate limit
            retry_after = int(response.headers.get("Retry-After", 5))
            print(f"Rate limit hit. Retrying in {retry_after} seconds.")
            time.sleep(retry_after)
            return top_tracks_data
        else:
            print(f"Failed to retrieve top tracks for artist ID {artist_id}: {response.status_code} - {response.text}")
    return top_tracks_data

# Step 5: Write track details to CSV
def write_tracks_to_csv(tracks):
    file_exists = os.path.isfile(track_file_path)
    with open(track_file_path, mode='a', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        if not file_exists:
            writer.writerow([
                'artist_id', 'track_id', 'track_name', 'popularity', 'album_name', 
                'release_date', 'external_url'
            ])
        for track in tracks:
            writer.writerow([
                track['artist_id'],
                track['track_id'],
                track['track_name'],
                track['popularity'],
                track['album_name'],
                track['release_date'],
                track['external_url']
            ])
    print(f"Wrote {len(tracks)} tracks to CSV.")

# Step 6: Read unique track IDs from the CSV file
def read_track_ids(file_path):
    track_ids = set()
    with open(file_path, mode='r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            track_ids.add(row['track_id'])
    print(f"Loaded {len(track_ids)} unique track IDs from file.")
    return track_ids

# Step 7: Get Audio Features for Multiple Track IDs in Batch
def get_audio_features_batch(track_ids, batch_size=100):
    check_and_refresh_token()  # Ensure token is valid
    audio_features_data = []
    headers = {'Authorization': f'Bearer {access_token}'}
    existing_track_ids = set()
    if os.path.isfile(audio_features_file_path):
        with open(audio_features_file_path, mode='r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                existing_track_ids.add(row['track_id'])
    
    # Filter out existing track IDs
    track_ids = [track_id for track_id in track_ids if track_id not in existing_track_ids]
    
    start_time = time.time()  # Track the start time for pauses
    for i in range(0, len(track_ids), batch_size):
        batch_track_ids = track_ids[i:i + batch_size]
        
        # Check if 30 seconds has passed
        if time.time() - start_time >= 30:
            print("Pausing for 30 seconds to avoid rate limits.")
            time.sleep(30)
            check_and_refresh_token() 
            start_time = time.time()  # Reset the timer after the pause
        
        url = "https://api.spotify.com/v1/audio-features"
        params = {'ids': ','.join(batch_track_ids)}
        
        response = requests.get(url, headers=headers, params=params)
        time.sleep(1)  # 1-second delay after each request
        
        if response.status_code == 200:
            audio_features = response.json()['audio_features']
            write_audio_features_to_csv(audio_features)  # Write each batch to CSV immediately
        elif response.status_code == 429:  # Rate limit
            retry_after = int(response.headers.get("Retry-After", 5))
            print(f"Rate limit hit. Retrying in {retry_after} seconds.")
            time.sleep(retry_after)
        else:
            print(f"Failed to retrieve audio features: {response.status_code} - {response.text}")

# Step 8: Write audio features to CSV
def write_audio_features_to_csv(audio_features):
    file_exists = os.path.isfile(audio_features_file_path)
    with open(audio_features_file_path, mode='a', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        if not file_exists:
            writer.writerow([
                'track_id', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 
                'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo'
            ])
        for features in audio_features:
            if features:  # Ensure audio features data is valid
                writer.writerow([
                    features['id'],
                    features['danceability'],
                    features['energy'],
                    features['key'],
                    features['loudness'],
                    features['mode'],
                    features['speechiness'],
                    features['acousticness'],
                    features['instrumentalness'],
                    features['liveness'],
                    features['valence'],
                    features['tempo']
                ])
    print(f"Wrote {len(audio_features)} audio features to CSV.")

# Run the collection process
if __name__ == "__main__":
    # Step 1: Initialize and get access token
    get_access_token(client_id, client_secret)
    
    # # Step 2: Load artist IDs
    # artist_ids = read_artist_ids(artist_file_path)
    
    # # Step 3: Collect top tracks in batches
    # get_top_tracks_batch(artist_ids, country='US', limit=10)
    
    # Step 4: Load track IDs
    track_ids = read_track_ids(track_file_path)
    
    # Step 5: Collect audio features in batches
    get_audio_features_batch(track_ids, batch_size=100)


Successfully obtained access token.
Loaded 135074 unique track IDs from file.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Pausing for 30 seconds to avoid rate limits.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio features to CSV.
Wrote 100 audio fea