In [None]:
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from datetime import datetime
import spotipy
from spotipy.oauth2 import SpotifyOAuth
import snowflake.connector

# Set up Spotify API credentials
def spotify_auth():
    return spotipy.Spotify(auth_manager=spotipy.oauth2.SpotifyClientCredentials(
        client_id=Variable.get("spotify_client_id"),
        client_secret=Variable.get("spotify_client_secret")
    ))

# Set up Snowflake connection
def return_snowflake_conn():
    conn = snowflake.connector.connect(
        user=Variable.get("snowflake_userid"),
        password=Variable.get("snowflake_password"),
        account=Variable.get("snowflake_account"),
        warehouse="compute_wh",
        database="dev",
        schema="public"
    )
    return conn.cursor()

# Extract Spotify Data
@task
def extract_spotify_data():
    sp = spotify_auth()
    artist_name = Variable.get("spotify_artist", default_var="Taylor Swift")

    # Search for artist
    results = sp.search(q=f"artist:{artist_name}", type="artist", limit=1)
    if not results["artists"]["items"]:
        raise Exception(f"Artist '{artist_name}' not found on Spotify.")
    artist_id = results["artists"]["items"][0]["id"]

    # Get all albums and singles
    albums = []
    seen_album_names = set()
    results = sp.artist_albums(artist_id, album_type="album,single", limit=50)
    albums.extend(results["items"])
    while results["next"]:
        results = sp.next(results)
        albums.extend(results["items"])

    # Remove duplicate albums
    unique_albums = {album['name']: album for album in albums}.values()

    # Extract track info
    records = []
    for album in unique_albums:
        release_date = album.get("release_date")
        album_tracks = sp.album_tracks(album["id"])["items"]
        for track in album_tracks:
            track_info = sp.track(track["id"])
            records.append({
                "track_name": track_info["name"].replace("'", "''"),
                "artist_name": track_info["artists"][0]["name"].replace("'", "''"),
                "popularity": track_info["popularity"],
                "release_date": release_date,
                "duration_ms": track_info["duration_ms"]
            })
    return records


# Load data into Snowflake
@task
def load_to_snowflake(records):
    cur = return_snowflake_conn()
    try:
        cur.execute("BEGIN;")
        cur.execute("""
            CREATE TABLE IF NOT EXISTS top_tracks (
                track_name VARCHAR,
                artist_name VARCHAR,
                popularity INT,
                release_date DATE,
                duration_ms INT
            );
        """)
        cur.execute("DELETE FROM top_tracks;")

        for record in records:
            sql = f"""
                INSERT INTO top_tracks (track_name, artist_name, popularity, release_date, duration_ms)
                VALUES ('{record["track_name"]}', '{record["artist_name"]}', {record["popularity"]},
                        '{record["release_date"]}', {record["duration_ms"]});
            """
            cur.execute(sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise e


# Define the DAG
with DAG(
    dag_id="spotify_popularity_prediction",
    start_date=datetime(2025, 2, 21),
    catchup=False,
    schedule="0 2 * * *",
    tags=["spotify", "etl"]
) as dag:
    spotify_data = extract_spotify_data()
    load_to_snowflake(spotify_data)


In [5]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from datetime import datetime, timedelta
import time

# Authenticate with Spotify
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(
    client_id="055318a77fd540a08bd3fdf4837fdde5",
    client_secret="27ec822652584ba8ae791acd99a4ad68"
))

# Define date range
today = datetime.today()
start_date = today - timedelta(days=5)

for day_offset in range(6):
    date_obj = start_date + timedelta(days=day_offset)
    year_only = date_obj.year
    print(f"\n--- Querying for year: {year_only} on {date_obj.strftime('%Y-%m-%d')} ---")
    
    all_tracks = []
    offset = 0
    limit = 50
    max_results = 1000  # Spotify max
    total_retrieved = 0

    while True:
        try:
            result = sp.search(q=f"year:{year_only}", type="track", limit=limit, offset=offset)
            items = result['tracks']['items']
            if not items:
                break  # No more results
            all_tracks.extend(items)
            offset += limit
            total_retrieved += len(items)
            print(f"Retrieved {total_retrieved} tracks so far...")
            if offset >= max_results:
                break  # Respect Spotify's cap
            time.sleep(0.2)  # Be kind to the API
        except Exception as e:
            print(f"Error fetching results at offset {offset}: {e}")
            break

    print(f"✅ Total tracks fetched for year {year_only}: {len(all_tracks)}")



--- Querying for year: 2025 on 2025-04-27 ---
Retrieved 33 tracks so far...
Retrieved 83 tracks so far...
Retrieved 133 tracks so far...
Retrieved 183 tracks so far...
Retrieved 233 tracks so far...
Retrieved 283 tracks so far...
Retrieved 333 tracks so far...
Retrieved 383 tracks so far...
Retrieved 433 tracks so far...
Retrieved 483 tracks so far...
Retrieved 533 tracks so far...
Retrieved 583 tracks so far...
Retrieved 633 tracks so far...
Retrieved 683 tracks so far...
Retrieved 733 tracks so far...
Retrieved 783 tracks so far...
Retrieved 833 tracks so far...
Retrieved 883 tracks so far...
Retrieved 933 tracks so far...
Retrieved 983 tracks so far...
✅ Total tracks fetched for year 2025: 983

--- Querying for year: 2025 on 2025-04-28 ---
Retrieved 33 tracks so far...
Retrieved 83 tracks so far...
Retrieved 133 tracks so far...
Retrieved 183 tracks so far...
Retrieved 233 tracks so far...
Retrieved 283 tracks so far...
Retrieved 333 tracks so far...
Retrieved 383 tracks so far...


In [6]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from datetime import datetime, timedelta
import time

# Authenticate with Spotify
sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(
    client_id="055318a77fd540a08bd3fdf4837fdde5",
    client_secret="27ec822652584ba8ae791acd99a4ad68"
))

# Define date range
today = datetime.today()
start_date = today - timedelta(days=5)

for day_offset in range(6):
    date_obj = start_date + timedelta(days=day_offset)
    query_date = date_obj.strftime("%Y-%m-%d")
    year_only = date_obj.year
    print(f"\n--- Querying for year: {year_only} on {query_date} ---")

    # ---------------------------
    # Option 1: Year Query + Filtering
    # ---------------------------
    print("🟡 Option 1: Fetch year-wide tracks and filter by exact release date")

    all_tracks = []
    offset = 0
    limit = 50
    max_results = 1000
    total_retrieved = 0

    while True:
        try:
            result = sp.search(q=f"year:{year_only}", type="track", limit=limit, offset=offset)
            items = result['tracks']['items']
            if not items:
                break
            all_tracks.extend(items)
            offset += limit
            total_retrieved += len(items)
            if offset >= max_results:
                break
            time.sleep(0.2)
        except Exception as e:
            print(f"Error at offset {offset}: {e}")
            break

    filtered_tracks = [
        t for t in all_tracks
        if t["album"]["release_date"] == query_date
    ]
    print(f"🎯 Found {len(filtered_tracks)} track(s) released exactly on {query_date}")

    # ---------------------------
    # Option 2: Use 'tag:new' to bias toward recent
    # ---------------------------
    print("🔵 Option 2: Use 'tag:new' to try biasing toward recent releases")

    try:
        recent_tagged = sp.search(q=f"year:{year_only} tag:new", type="track", limit=10)
        for track in recent_tagged['tracks']['items']:
            print(f"  🎵 {track['name']} - {track['album']['release_date']}")
    except Exception as e:
        print(f"Error with tag:new search: {e}")

    # ---------------------------
    # Option 3: Use new_releases endpoint
    # ---------------------------
    print("🟢 Option 3: Use new_releases endpoint")

    try:
        new_releases = sp.new_releases(limit=10)
        for album in new_releases['albums']['items']:
            print(f"  💿 {album['name']} by {album['artists'][0]['name']} - Released: {album['release_date']}")
    except Exception as e:
        print(f"Error fetching new releases: {e}")



--- Querying for year: 2025 on 2025-04-27 ---
🟡 Option 1: Fetch year-wide tracks and filter by exact release date
🎯 Found 0 track(s) released exactly on 2025-04-27
🔵 Option 2: Use 'tag:new' to try biasing toward recent releases
🟢 Option 3: Use new_releases endpoint
  💿 THE TORTURED POETS DEPARTMENT by Taylor Swift - Released: 2024-04-18
  💿 Dark Matter by Pearl Jam - Released: 2024-04-19
  💿 HERicane by Lucky Daye - Released: 2024-04-19
  💿 Teka (with Peso Pluma) by DJ Snake - Released: 2024-04-17
  💿 REBEL by Anne Wilson - Released: 2024-04-19
  💿 Define My Name by Nas - Released: 2024-04-19
  💿 Wasteland, Baby! (Special Edition) by Hozier - Released: 2019-03-01
  💿 Baddy On The Floor by Jamie xx - Released: 2024-04-15
  💿 For Life (feat. Nile Rodgers) by Kygo - Released: 2024-04-19
  💿 Espectacular by Sky Rompiendo - Released: 2024-04-18

--- Querying for year: 2025 on 2025-04-28 ---
🟡 Option 1: Fetch year-wide tracks and filter by exact release date
🎯 Found 2 track(s) released exac