In [8]:
import os
import base64
import requests
import time
from sqlalchemy import create_engine, Column, String, Integer, Float, Boolean, Date, Table, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from dotenv import load_dotenv
from datetime import datetime

# Load .env file
load_dotenv()

# Load API and DB credentials
CLIENT_ID = ("340ebe283d9f462489c995e108401871")
CLIENT_SECRET = ("a84f4bc0ce67494c969e7a9b818ce574")
DATABASE_URL = ("postgresql://postgres:postgres@localhost:5432/Spotify_db")


In [9]:
# Set up SQLAlchemy
Base = declarative_base()
engine = create_engine(DATABASE_URL)
Session = sessionmaker(bind=engine)
session = Session()

In [10]:
# Get Spotify API token
def get_spotify_token():
    auth_header = base64.b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode()
    headers = {
        "Authorization": f"Basic {auth_header}"
    }
    data = {"grant_type": "client_credentials"}
    response = requests.post("https://accounts.spotify.com/api/token", headers=headers, data=data)
    return response.json()["access_token"]

SPOTIFY_TOKEN = get_spotify_token()
HEADERS = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"}

In [11]:
# SQLAlchemy models
class Artist(Base):
    __tablename__ = "artists"
    artist_id = Column(String, primary_key=True)
    name = Column(String)
    popularity = Column(Integer)
    followers = Column(Integer)
    albums = relationship("Album", back_populates="artist")
    tracks = relationship("TrackArtist", back_populates="artist")
    genres = relationship("ArtistGenre", back_populates="artist")

class Album(Base):
    __tablename__ = "albums"
    album_id = Column(String, primary_key=True)
    title = Column(String)
    artist_id = Column(String, ForeignKey("artists.artist_id"))
    release_date = Column(Date)
    album_type = Column(String)
    total_tracks = Column(Integer)
    artist = relationship("Artist", back_populates="albums")
    tracks = relationship("Track", back_populates="album")

class Track(Base):
    __tablename__ = "tracks"
    track_id = Column(String, primary_key=True)
    title = Column(String)
    album_id = Column(String, ForeignKey("albums.album_id"))
    duration_ms = Column(Integer)
    explicit = Column(Boolean)
    popularity = Column(Integer)
    release_date = Column(Date)
    album = relationship("Album", back_populates="tracks")
    artists = relationship("TrackArtist", back_populates="track")
    markets = relationship("TrackMarket", back_populates="track")
    collaborations = relationship("TrackCollaboration", back_populates="track")

class TrackArtist(Base):
    __tablename__ = "track_artists"
    track_id = Column(String, ForeignKey("tracks.track_id"), primary_key=True)
    artist_id = Column(String, ForeignKey("artists.artist_id"), primary_key=True)
    track = relationship("Track", back_populates="artists")
    artist = relationship("Artist", back_populates="tracks")

class TrackMarket(Base):
    __tablename__ = "track_markets"
    track_id = Column(String, ForeignKey("tracks.track_id"), primary_key=True)
    market = Column(String, primary_key=True)
    track = relationship("Track", back_populates="markets")

class TrackCollaboration(Base):
    __tablename__ = "track_collaborations"
    track_id = Column(String, ForeignKey("tracks.track_id"), primary_key=True)
    primary_artist_id = Column(String, ForeignKey("artists.artist_id"), primary_key=True)
    featured_artist_id = Column(String, ForeignKey("artists.artist_id"), primary_key=True)
    track = relationship("Track", back_populates="collaborations")

class ArtistGenre(Base):
    __tablename__ = "artist_genres"
    artist_id = Column(String, ForeignKey("artists.artist_id"), primary_key=True)
    genre = Column(String, primary_key=True)
    artist = relationship("Artist", back_populates="genres")

In [12]:
# Utility Functions
def parse_date(date_str):
    try:
        if len(date_str) == 4:
            return datetime.strptime(date_str, "%Y").date()
        elif len(date_str) == 7:
            return datetime.strptime(date_str, "%Y-%m").date()
        else:
            return datetime.strptime(date_str, "%Y-%m-%d").date()
    except:
        return None

In [13]:
def fetch_artist(artist_id):
    url = f"https://api.spotify.com/v1/artists/{artist_id}"
    try:
        res = requests.get(url, headers=HEADERS)
        if res.status_code == 429:
            retry_after = int(res.headers.get("Retry-After", 1))
            print(f"⏳ Rate limited. Waiting {retry_after} seconds before retrying artist fetch...")
            time.sleep(retry_after)
            res = requests.get(url, headers=HEADERS)
        if res.status_code != 200:
            print(f"⚠️ Request failed ({res.status_code}) for URL: {url}")
            print("Response content:", res.text)
            return None, []
        response = res.json()
    except requests.exceptions.RequestException as e:
        print(f"🚨 Request failed for artist {artist_id}: {e}")
        return None, []
    except ValueError:
        print(f"⚠️ Failed to parse JSON from response for URL: {url}")
        print("Raw response content:", res.text)
        return None, []
        if res.status_code == 429:
            retry_after = int(res.headers.get("Retry-After", 1))
            print(f"⏳ Rate limited. Waiting {retry_after} seconds before retrying artist fetch...")
            time.sleep(retry_after)
            res = requests.get(url, headers=HEADERS)
        if res.status_code != 200:
            print(f"⚠️ Request failed ({res.status_code}) for URL: {url}")
            print("Response content:", res.text)
            return None, []
        response = res.json()
    except ValueError:
        print(f"⚠️ Failed to parse JSON from response for URL: {url}")
        print("Raw response content:", res.text)
        return None, []

    if "error" in response or "id" not in response:
        print(f"⚠️ Failed to fetch artist: {artist_id} — {response.get('error', {}).get('message', 'Unknown error')}")
        return None, []

    artist = Artist(
        artist_id=response["id"],
        name=response["name"],
        popularity=response["popularity"],
        followers=response["followers"]["total"]
    )
    genres = [ArtistGenre(artist_id=artist.artist_id, genre=genre) for genre in response.get("genres", [])]

    time.sleep(0.1)
    return artist, genres


def load_playlist_data(playlist_id):
    artist_cache = {}
    processed_artists = set()
    track_counter = 0
    url = f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks?limit=100"

    while url:
        try:
            res = requests.get(url, headers=HEADERS)
            if res.status_code == 429:
                retry_after = int(res.headers.get("Retry-After", 1))
                print(f"⏳ Rate limited. Waiting {retry_after} seconds before retrying playlist fetch...")
                time.sleep(retry_after)
                res = requests.get(url, headers=HEADERS)
            res.raise_for_status()
            response = res.json()
        except requests.exceptions.RequestException as e:
            print(f"🚨 Request failed for playlist page: {e}")
            break
        except ValueError:
            print(f"⚠️ Failed to parse playlist JSON from response for URL: {url}")
            break

        time.sleep(0.05)

        for item in response["items"]:
            track_info = item.get("track")
            if not track_info:
                continue

            track_id = track_info["id"]
            print(f"🎵 Processing track: {track_info.get('name', '[unknown]')} — {track_id}")

            album_info = track_info["album"]
            artist_objs = track_info["artists"]
            release_date = parse_date(album_info.get("release_date"))

            # Ensure primary artist exists before inserting album
            primary_artist_id = artist_objs[0]["id"]
            if primary_artist_id not in artist_cache:
                if primary_artist_id not in processed_artists:
                    artist, genres = fetch_artist(primary_artist_id)
                    artist_cache[primary_artist_id] = (artist, genres)
                    processed_artists.add(primary_artist_id)
                    if artist:
                        session.merge(artist)
                        for genre in genres:
                            session.merge(genre)
            else:
                artist, genres = artist_cache[primary_artist_id]
                if artist:
                    session.merge(artist)
                    for genre in genres:
                        session.merge(genre)

            album = Album(
                album_id=album_info["id"],
                title=album_info["name"],
                artist_id=artist_objs[0]["id"],
                release_date=release_date,
                album_type=album_info["album_type"],
                total_tracks=album_info["total_tracks"]
            )
            session.merge(album)

            duration_ms = track_info["duration_ms"] or 0
            title = track_info["name"] or "Unknown Title"

            if duration_ms <= 0 or not title.strip():
                print(f"Skipping track {track_id} due to invalid duration or empty title.")
                continue

            track = Track(
                track_id=track_id,
                title=title,
                album_id=album.album_id,
                duration_ms=duration_ms,
                explicit=track_info["explicit"],
                popularity=track_info.get("popularity"),
                release_date=release_date
            )
            session.merge(track)

            for i, artist_obj in enumerate(artist_objs):
                artist_id = artist_obj["id"]
                if artist_id in artist_cache:
                    artist, genres = artist_cache[artist_id]
                else:
                    if artist_id in processed_artists:
                        continue
                    artist, genres = fetch_artist(artist_id)
                    artist_cache[artist_id] = (artist, genres)
                    processed_artists.add(artist_id)

                if artist:
                    session.merge(artist)
                    for genre in genres:
                        session.merge(genre)
                else:
                    continue

                session.merge(TrackArtist(track_id=track_id, artist_id=artist.artist_id))

                if i > 0:
                    session.merge(TrackCollaboration(
                        track_id=track_id,
                        primary_artist_id=artist_objs[0]["id"],
                        featured_artist_id=artist.artist_id
                    ))

            
            track_url = f"https://api.spotify.com/v1/tracks/{track_id}"
            try:
                track_res = requests.get(track_url, headers=HEADERS)
                if track_res.status_code == 429:
                    retry_after = int(track_res.headers.get("Retry-After", 1))
                    print(f"⏳ Rate limited. Waiting {retry_after} seconds before retrying track fetch...")
                    time.sleep(retry_after)
                    track_res = requests.get(track_url, headers=HEADERS)
                if track_res.status_code != 200:
                    print(f"⚠️ Request failed ({track_res.status_code}) for track URL: {track_url}")
                    print("Response content:", track_res.text)
                    continue
                track_full = track_res.json()
            except ValueError:
                print(f"⚠️ Failed to parse JSON from response for track URL: {track_url}")
                print("Raw response content:", track_res.text)
                continue
            for market in track_full.get("available_markets", []):
                session.merge(TrackMarket(track_id=track_id, market=market))

            track_counter += 1
            if track_counter % 50 == 0:
                now = datetime.now().strftime("%H:%M:%S")
                print(f"[{now}] ✅ Processed {track_counter} tracks...")

        url = response.get("next")

    session.commit()
    print(f"✅ Finished loading {track_counter} tracks into the database.")


In [None]:
# def fetch_artist(artist_id):
#     url = f"https://api.spotify.com/v1/artists/{artist_id}"
#     try:
#         res = requests.get(url, headers=HEADERS)
#     if res.status_code == 429:
#         retry_after = int(res.headers.get("Retry-After", 1))
#         print(f"⏳ Rate limited. Waiting {retry_after} seconds before retrying artist fetch...")
#         time.sleep(retry_after)
#         res = requests.get(url, headers=HEADERS)
#         if res.status_code != 200:
#             print(f"⚠️ Request failed ({res.status_code}) for URL: {url}")
#             print("Response content:", res.text)
#             return None, []
#         response = res.json()
#     except ValueError:
#         print(f"⚠️ Failed to parse JSON from response for URL: {url}")
#         print("Raw response content:", res.text)
#         return None, []

#     if "error" in response or "id" not in response:
#         print(f"⚠️ Failed to fetch artist: {artist_id} — {response.get('error', {}).get('message', 'Unknown error')}")
#         return None, []

#     artist = Artist(
#         artist_id=response["id"],
#         name=response["name"],
#         popularity=response["popularity"],
#         followers=response["followers"]["total"]
#     )
#     genres = [ArtistGenre(artist_id=artist.artist_id, genre=genre) for genre in response.get("genres", [])]

#     time.sleep(0.1)
#     return artist, genres

# def fetch_audio_features(track_id):
#     url = f"https://api.spotify.com/v1/audio-features/{track_id}"
#     try:
#         res = requests.get(url, headers=HEADERS)
#         if res.status_code != 200:
#             print(f"⚠️ Request failed ({res.status_code}) for audio features URL: {url}")
#             print("Response content:", res.text)
#             return None
#         response = res.json()
#     except ValueError:
#         print(f"⚠️ Failed to parse JSON from response for audio features URL: {url}")
#         print("Raw response content:", res.text)
#         return None

#     if "id" in response:
#         return AudioFeatures(
#             track_id=response["id"],
#             danceability=response["danceability"],
#             energy=response["energy"],
#             loudness=response["loudness"],
#             speechiness=response["speechiness"],
#             acousticness=response["acousticness"],
#             instrumentalness=response["instrumentalness"],
#             liveness=response["liveness"],
#             valence=response["valence"],
#             tempo=response["tempo"]
#         )
#     return None

# def load_playlist_data(playlist_id):
#     processed_artists = set()
#     track_counter = 0
#     url = f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks?limit=100"

#     while url:
#         response = requests.get(url, headers=HEADERS).json()

#         for item in response["items"]:
#             track_info = item.get("track")
#             if not track_info:
#                 continue

#             track_id = track_info["id"]
#             print(f"🎵 Processing track: {track_info.get('name', '[unknown]')} — {track_id}")

#             album_info = track_info["album"]
#             artist_objs = track_info["artists"]
#             release_date = parse_date(album_info.get("release_date"))

#             for i, artist_obj in enumerate(artist_objs):
#                 artist_id = artist_obj["id"]
#                 if artist_id in processed_artists:
#                     continue

#                 artist, genres = fetch_artist(artist_id)
#                 if artist:
#                     session.merge(artist)
#                     for genre in genres:
#                         session.merge(genre)
#                     processed_artists.add(artist_id)
#                 else:
#                     continue

#             album = Album(
#                 album_id=album_info["id"],
#                 title=album_info["name"],
#                 artist_id=artist_objs[0]["id"],
#                 release_date=release_date,
#                 album_type=album_info["album_type"],
#                 total_tracks=album_info["total_tracks"]
#             )
#             session.merge(album)

#             duration_ms = track_info["duration_ms"] or 0
#             title = track_info["name"] or "Unknown Title"

#             if duration_ms <= 0 or not title.strip():
#                 print(f"Skipping track {track_id} due to invalid duration or empty title.")
#                 continue

#             track = Track(
#                 track_id=track_id,
#                 title=title,
#                 album_id=album.album_id,
#                 duration_ms=duration_ms,
#                 explicit=track_info["explicit"],
#                 popularity=track_info.get("popularity"),
#                 release_date=release_date
#             )
#             session.merge(track)

#             for i, artist_obj in enumerate(artist_objs):
#                 artist, genres = fetch_artist(artist_obj["id"])
#                 if artist:
#                     session.merge(artist)
#                     for genre in genres:
#                         session.merge(genre)
#                 else:
#                     continue

#                 session.merge(TrackArtist(track_id=track_id, artist_id=artist.artist_id))

#                 if i > 0:
#                     session.merge(TrackCollaboration(
#                         track_id=track_id,
#                         primary_artist_id=artist_objs[0]["id"],
#                         featured_artist_id=artist.artist_id
#                     ))
        
#             track_url = f"https://api.spotify.com/v1/tracks/{track_id}"
#             try:
#                 track_res = requests.get(track_url, headers=HEADERS)
#                 if track_res.status_code != 200:
#                     print(f"⚠️ Request failed ({track_res.status_code}) for track URL: {track_url}")
#                     print("Response content:", track_res.text)
#                     continue
#                 track_full = track_res.json()
#             except ValueError:
#                 print(f"⚠️ Failed to parse JSON from response for track URL: {track_url}")
#                 print("Raw response content:", track_res.text)
#                 continue
#             for market in track_full.get("available_markets", []):
#                 session.merge(TrackMarket(track_id=track_id, market=market))

#             track_counter += 1
#             if track_counter % 50 == 0:
#                 now = datetime.now().strftime("%H:%M:%S")
#                 print(f"[{now}] ✅ Processed {track_counter} tracks...")

#         url = response.get("next")

#     session.commit()

In [14]:
# Run the script
if __name__ == "__main__":
    Base.metadata.create_all(engine)
    load_playlist_data("7c2c13pKxvFDSV4WSyydyg")
    print("Data loaded successfully")

🎵 Processing track: Poor Little Fool - 2001 Digital Remaster — 33FPsMEl3UwpytDuyf9VYq
🎵 Processing track: Nel Blu Dipinto Di Blu — 5zyrEv4F3FaLECI8TOKpFM
🎵 Processing track: Little Star — 3c7KT5CN8uYRaK3xThhdYt
🎵 Processing track: It's All In The Game — 01OFUZ8btJxFI6n5igQUqx
🎵 Processing track: It's Only Make Believe - Single Version — 6neFMH7Beu1uVcs3w65bNw
🎵 Processing track: Tom Dooley - Remastered — 5rivhNukBcqEX41XQDLYi9
🎵 Processing track: To Know Him Is To Love Him — 1nnUPkQWOcXl5OCKStXnls
🎵 Processing track: The Chipmunk Song (Christmas Don't Be Late) - Remastered 1999 — 02NKMA9cIkq6VuBNu9q9Wf
🎵 Processing track: Smoke Gets In Your Eyes — 1jTkRvUHQhh2v77G5KOyYW
🎵 Processing track: Stagger Lee — 4MUGG9mgDUP8dlaS3AAeg0
🎵 Processing track: Venus — 6feurIHW6vyqq9OGOieSVo
🎵 Processing track: Come Softly To Me — 603N4XGJUTbK760GLCvIIs
🎵 Processing track: The Happy Organ — 2ByaTRkrAnioKrKN07kyMt
🎵 Processing track: Kansas City — 0Z1UZtbNvJPXthNyvBXknL
🎵 Processing track: The Battle O