In [11]:
import os
import json
import subprocess
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
import collections
import requests

# Spotify API credentials
SPOTIPY_CLIENT_ID = '11419dac9f88424c93dd910d7d82a6bc'
SPOTIPY_CLIENT_SECRET = 'f29c523eb7fe44f686f59e5e9938c87a'

In [13]:
# Function to get the top genres by analyzing top tracks globally
def get_top_genres(sp):
    genres = sp.recommendation_genre_seeds()['genres']
    return genres # Limiting to top 62 genres

def get_top_songs_for_genre(sp, genre, offset):
    # Initial search to get more than 15 songs
    results = sp.search(q=f'genre:"{genre}"', type='track', limit=50, offset=offset)
    
    # Filter tracks with popularity of 75 or higher
    popular_tracks = [
        track for track in results['tracks']['items']
        if track['popularity'] >= 75 and track['preview_url'] is not None
    ]
    
    # Get the top 15 tracks from the filtered list
    top_tracks = popular_tracks[:15]
    
    return [
        (
            track['id'],
            track['name'],
            track['artists'][0]['name'],
            genre,
            track['popularity'],
            track['album']['images'][0]['url'] if track['album']['images'] else None,
            track['preview_url']
        )
        for track in top_tracks
    ]

# Function to write popular songs to a JSON file, only if they don't already exist
def write_popular_songs_to_file(file_path, popular_songs):
    existing_songs = {}
    if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                existing_songs = {song['name']: song for song in json.load(file)}
        except json.JSONDecodeError:
            pass  # If JSON is not valid, start with an empty dictionary
    
    with open(file_path, 'w', encoding='utf-8') as file:
        for song_id, song, artist, genre, popularity, cover_image_url, preview_url in popular_songs:
            if song not in existing_songs:
                existing_songs[song] = {
                    'id': song_id,
                    'name': song,
                    'artist': artist,
                    'genre': genre,
                    'popularity': popularity,
                    'cover_image_url': cover_image_url,
                    'preview_url': preview_url
                }
        json.dump(list(existing_songs.values()), file, indent=4)

# Function to write artist metadata to a JSON file
def write_artist_metadata_to_file(file_path, popular_songs):
    artist_metadata = {}
    if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                artist_metadata = json.load(file)
        except json.JSONDecodeError:
            pass  # If JSON is not valid, start with an empty dictionary
    
    for song_id, song, artist, genre, popularity, cover_image_url, preview_url in popular_songs:
        if artist not in artist_metadata:
            artist_metadata[artist] = []
        # Check for duplicate songs
        if not any(existing_song['id'] == song_id for existing_song in artist_metadata[artist]):
            artist_metadata[artist].append({
                'id': song_id,
                'name': song,
                'artist': artist,
                'genre': genre,
                'popularity': popularity,
                'cover_image_url': cover_image_url,
                'preview_url': preview_url
            })
    
    with open(file_path, 'w', encoding='utf-8') as file:
        json.dump(artist_metadata, file, indent=4)

# Function to write genre metadata to a JSON file
def write_genre_metadata_to_file(file_path, popular_songs):
    genre_metadata = {}
    if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                genre_metadata = json.load(file)
        except json.JSONDecodeError:
            pass  # If JSON is not valid, start with an empty dictionary
    
    for song_id, song, artist, genre, popularity, cover_image_url, preview_url in popular_songs:
        if genre not in genre_metadata:
            genre_metadata[genre] = []
        # Check for duplicate songs
        if not any(existing_song['id'] == song_id for existing_song in genre_metadata[genre]):
            genre_metadata[genre].append({
                'id': song_id,
                'name': song,
                'artist': artist,
                'genre': genre,
                'popularity': popularity,
                'cover_image_url': cover_image_url,
                'preview_url': preview_url
            })
    
    with open(file_path, 'w', encoding='utf-8') as file:
        json.dump(genre_metadata, file, indent=4)

# Function to download song preview
def download_preview(url, filename):
    response = requests.get(url)
    with open(filename, 'wb') as f:
        f.write(response.content)

# Spotify authentication
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(client_id=SPOTIPY_CLIENT_ID, client_secret=SPOTIPY_CLIENT_SECRET))

# Get the top genres
top_genres = get_top_genres(sp)

# Ensure directories exist
os.makedirs('previews', exist_ok=True)
os.makedirs('metadata', exist_ok=True)

# Get the top 40 songs for each genre
popular_songs = []
for genre in tqdm(top_genres, desc="Fetching top songs for genres"):
    count = 0
    for offset in range(0, 250, 50):
        if count > 10:
            break

        song_list = get_top_songs_for_genre(sp, genre, offset)
        popular_songs.extend(song_list)
        count += len(song_list)

        # Download previews
        for song_id, song, artist, genre, popularity, cover_image_url, preview_url in song_list:
            if preview_url:
                download_preview(preview_url, os.path.join('previews', f"{song_id}.mp3"))

# Write popular songs to the JSON file only if they don't already exist
popular_songs_file = 'metadata/popular_songs.json'
write_popular_songs_to_file(popular_songs_file, popular_songs)

# Write artist metadata to a JSON file
artist_metadata_file = 'metadata/artist_metadata.json'
write_artist_metadata_to_file(artist_metadata_file, popular_songs)

# Write genre metadata to a JSON file
genres_metadata_file = 'metadata/genres_metadata.json'
write_genre_metadata_to_file(genres_metadata_file, popular_songs)

print(f"Downloaded {len(popular_songs)} song previews and saved metadata.")

Fetching top songs for genres:   0%|          | 0/126 [00:00<?, ?it/s]


NameError: name 'song_id' is not defined

In [None]:
# Download the previews
import requests


# Spotify authentication
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(client_id=SPOTIPY_CLIENT_ID, client_secret=SPOTIPY_CLIENT_SECRET))

# Get the top genres
top_genres = get_top_genres(sp)

# Ensure directories exist
os.makedirs('previews', exist_ok=True)
os.makedirs('metadata', exist_ok=True)

# Get the top 40 songs for each genre
popular_songs = []
for genre in tqdm(top_genres, desc="Fetching top songs for genres"):
    count = 0
    for offset in range(0, 250, 50):
        if count > 10:
            break

        song_list = get_top_songs_for_genre(sp, genre, offset)
        popular_songs.extend(song_list)
        count += len(song_list)

        # Download previews
        for song_id, song, artist, genre, popularity, cover_image_url, preview_url in song_list:
            if preview_url:
                download_preview(preview_url, os.path.join('previews', f"{song_id}.mp3"))


In [None]:
# Function to read popular songs from a text file
def read_popular_songs(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        lines = file.readlines()
    return [line.strip().split(" - ") for line in lines]

# Download preview using ytmdl and convert to 16kHz mono MP3
def download_and_convert(song_name, output_dir='songs'):
    temp_output_dir = os.path.join(output_dir, 'temp')
    os.makedirs(temp_output_dir, exist_ok=True)
    
    # Sanitize song name for file system
    safe_song_name = song_name.replace('"', '').replace("'", "").replace(":", "")
    
    try:
        # Download the song using ytmdl to the temporary directory
        command_download = f'ytmdl "{safe_song_name}" -q -o {temp_output_dir} --ignore-chapters'
        subprocess.run(command_download, shell=True, check=True)
        
        # Find the downloaded file (assuming there's only one file in the temp directory)
        downloaded_files = os.listdir(temp_output_dir)
        if not downloaded_files:
            raise FileNotFoundError(f"No files found in {temp_output_dir} after download.")
        
        temp_output = os.path.join(temp_output_dir, downloaded_files[0])
        final_output = os.path.join(output_dir, f"{safe_song_name.replace(' ', '_')}.mp3")
        
        # Convert the audio to 16kHz mono MP3 using ffmpeg
        command_convert = f'ffmpeg -i "{temp_output}" -ac 1 -ar 16000 "{final_output}"'
        subprocess.run(command_convert, shell=True, check=True)
        
        # Remove the temporary files
        for file in downloaded_files:
            os.remove(os.path.join(temp_output_dir, file))
    except (subprocess.CalledProcessError, FileNotFoundError) as e:
        print(f"Error downloading or converting {song_name}: {e}")


# Read popular songs from the text file
popular_songs = read_popular_songs(popular_songs_file)

all_songs = []
artists_metadata = {}
genres_metadata = {genre: [] for genre in top_genres}

# Create directories if they don't exist
os.makedirs('songs', exist_ok=True)
os.makedirs('metadata', exist_ok=True)

with ThreadPoolExecutor(max_workers=1) as executor:
    futures = [executor.submit(download_and_convert, f"{song_name} - {artist}") for song_name, artist, genre in popular_songs]
    for future in tqdm(as_completed(futures), total=len(futures), desc="Downloading and converting"):
        future.result()
