# Grab spotify usage data data from REST API

- Purpose: 
    - query timestamp of latest data point from ETL table,
    - obtain a service pricipal OAUTH token,
    - For each day from last point until today: gather transactional JSON data (paginated),
    - concat and process data,
    - insert data to interface table,
    - add loading success/failure and stats to etl table.
- Author: vsm
- Date: 2025-07-08
- Team: GS - BI/ERP

## Requirements

1. Update package lists
`sudo apt-get update`

2. Install unixODBC and PostgreSQL ODBC driver
`sudo apt-get install -y unixodbc unixodbc-dev odbc-postgresql`

3. Optional: also install PostgreSQL client utilities
`sudo apt-get install -y postgresql-client`



### Load Parameters and modules

In [1]:
import requests
import json
import hashlib
import pyodbc
import base64
import urllib.parse
from datetime import datetime, timedelta, timezone

from dotenv import dotenv_values

import sys
import logging

# remove orphaned logging handlers
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s %(message)s")

env_dict = dotenv_values("./.env_spotify")

API_CREDS = {
    "CLIENT_ID": env_dict['CLIENT_ID'],
    "CLIENT_SECRET": env_dict['CLIENT_SECRET'],
    "SCOPE": env_dict['SCOPE'],
    "REDIRECT_URI": env_dict['REDIRECT_URI'],
    "OAUTH_URL": env_dict['OAUTH_URL'],
    "REST_URL": env_dict['REST_URL'],
}

BI_META = {
    "BI_SERVICE_NAME": env_dict['BI_SERVICE_NAME'],
    "BI_STAGING_TABLE": env_dict['BI_STAGING_TABLE'],
    "BI_LOG_TABLE": env_dict['BI_LOG_TABLE'],
    "BI_INGEST_TS": env_dict['BI_INGEST_TS'],
}

SQL_CREDS = {
    "PG_SERVER": env_dict['PG_SERVER'],
    "PG_PORT": env_dict['PG_PORT'],
    "PG_DB": env_dict['PG_DB'],
    "PG_SCHEMA": env_dict['PG_SCHEMA'],
    "PG_USER": env_dict['PG_USER'],
    "PG_PWD": env_dict['PG_PWD'],
}

logging.info("Loading Modules")


2025-07-08 22:05:59,426 Loading Modules


## Authorization Code Flow Overview

1. Redirect user to Spotify login & consent screen.

2. Spotify redirects back with code → exchange this for an access_token and refresh_token.

3. Use the access_token to call APIs on behalf of the user.

4. Use the refresh_token to get a new access token when expired.

### 1. Generate Auth URL & Open It

In [None]:
params = {
    "client_id": API_CREDS['CLIENT_ID'],
    "response_type": "code",
    "redirect_uri": API_CREDS['REDIRECT_URI'],
    "scope": API_CREDS['SCOPE'],
}

auth_url = "https://accounts.spotify.com/authorize?" + urllib.parse.urlencode(params)
print("Go to the following URL and authorize the app:")
print(auth_url)

# Optional: open it automatically
#import webbrowser
#webbrowser.open(auth_url)

### 2. Manually Paste the Redirect URL and get Code from callback URL

In [None]:
redirected_url = input("Paste the full redirect URL here: ")
code = urllib.parse.parse_qs(urllib.parse.urlparse(redirected_url).query)['code'][0]
print("Auth code:", code)


### 3. Exchange Code for Tokens

In [None]:
import base64

def get_tokens(code):
    token_url = "https://accounts.spotify.com/api/token"

    auth_header = base64.b64encode(f"{API_CREDS['CLIENT_ID']}:{API_CREDS['CLIENT_SECRET']}".encode()).decode()
    headers = {
        "Authorization": f"Basic {auth_header}",
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "grant_type": "authorization_code",
        "code": code,
        "redirect_uri": API_CREDS['REDIRECT_URI'],
    }

    res = requests.post(token_url, headers=headers, data=data)
    res.raise_for_status()
    return res.json()

tokens = get_tokens(code)
access_token = tokens["access_token"]
refresh_token = tokens["refresh_token"]

print("Access token:", access_token)
print("Refresh token:", refresh_token)

### 4. Refresh access token

In [None]:
refresh_token=''
def refresh_access_token(refresh_token):
    token_url = "https://accounts.spotify.com/api/token"
    auth_header = base64.b64encode(f"{API_CREDS['CLIENT_ID']}:{API_CREDS['CLIENT_SECRET']}".encode()).decode()

    headers = {
        "Authorization": f"Basic {auth_header}",
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "grant_type": "refresh_token",
        "refresh_token": refresh_token
    }

    res = requests.post(token_url, headers=headers, data=data)
    res.raise_for_status()
    return res.json()["access_token"]

In [None]:
refresh_access_token(refresh_token)

### Make testing Call

In [None]:
access_token = ''

res = requests.get(
    "https://api.spotify.com/v1/me/player/recently-played?limit=5",
    headers={"Authorization": f"Bearer {access_token}"}
)

for i, item in enumerate(res.json().get("items", [])):
    track = item['track']
    print(f"{i+1}. {track['name']} – {track['artists'][0]['name']}")

1. Rope Ends – Pain of Salvation
2. Lake of Fire – Meat Puppets
3. Song to the Siren – Tim Buckley
4. Look At The Fool - Remastered – Tim Buckley
5. Grace – Jeff Buckley


In [6]:
dt = datetime(2025, 6, 30)

after = int(dt.timestamp() * 1000)

In [24]:
# === Postgres Connection ===
def get_sql_conn():
    logging.info(f"Connecting to DB")
    return pyodbc.connect(
        f"DRIVER={{PostgreSQL}};"
        f"SERVER={SQL_CREDS['PG_SERVER']},{SQL_CREDS['PG_PORT']};"
        f"PORT={SQL_CREDS['PG_PORT']};"        
        f"DATABASE={SQL_CREDS['PG_DB']};"
        f"UID={SQL_CREDS['PG_USER']};"
        f"PWD={SQL_CREDS['PG_PWD']}"
    )

# === Get latest timestamp ===
def get_latest_timestamp(conn):
    logging.info(f"Getting maximum event timestamp")
    cursor = conn.cursor()
    cursor.execute(f"SELECT COALESCE(MAX(event_time), '2025-06-30') FROM {SQL_CREDS['PG_SCHEMA']}.{BI_META['BI_STAGING_TABLE']}")
    result = cursor.fetchone()
    return result[0] if result else datetime(2025, 6, 30)

# === Get Token via Client Credentials Flow ===
# === Note: this will not suffice for personal data
def get_access_token():
    logging.info("Getting OAuth")
    
    auth_str = f"{API_CREDS['CLIENT_ID']}:{API_CREDS['CLIENT_SECRET']}"
    b64_auth_str = base64.b64encode(auth_str.encode()).decode()

    headers = {
        'Authorization': f'Basic {b64_auth_str}',
        'Content-Type': 'application/x-www-form-urlencoded'
    }

    data = {
        'grant_type': 'client_credentials'
    }

    url = API_CREDS['OAUTH_URL']

    res = requests.post(url, headers=headers, data=data)
    res.raise_for_status()
    
    return res.json()['access_token']

# === Fetch paginated spotify usage data ===
def fetch_usage_data(timestamp_ms, token):
    logging.info("Getting REST Data")
    limit = 20
    rows = []

    base_url = f"{API_CREDS['REST_URL']}/me/player/recently-played"
    headers = {"Authorization": f"Bearer {token}"}
    params = {"after": timestamp_ms, "limit": limit}
    url = base_url
    counter = 1

    while url:
        logging.info(f"Page_{counter:02d}")
        resp = requests.get(url, headers=headers, params=params if counter == 1 else None)
        resp.raise_for_status()
        data = resp.json()
        rows.extend(data.get("items", []))
        url = data.get("next")
        params = None  # Only apply params on the first page
        counter += 1

    return rows

# === Sanitize JSON row ===
def remove_available_markets(rows):
    for item in rows:
        track = item.get("track", {})
        track.pop("available_markets", None)

        album = track.get("album", {})
        album.pop("available_markets", None)

# === Hash JSON row ===
def hash_row(row):
    return hashlib.sha256(json.dumps(row, sort_keys=True).encode()).hexdigest()

# === Insert new data (with hashdiff) ===
def insert_new_data(conn, rows):
    logging.info(f"Inserting relevant data to DB")
    inserted_count = 0
    duplicate_count =0

    cursor = conn.cursor()

    max_tf_ts = datetime(1900, 1, 1)

    for row in rows:
        tf_ts_str = row.get(BI_META['BI_INGEST_TS'])
        if not tf_ts_str:
            logging.warning(f"JSON payload does not contain {BI_META['BI_INGEST_TS']} key")
            continue  # skip if missing transaction timestamp

        try:
            tf_ts = datetime.fromisoformat(tf_ts_str)
        except ValueError:
            logging.warning(f"Row:{inserted_count + 1}: Timestamp {BI_META['BI_INGEST_TS']} malformed")
            continue  # skip malformed timestamp

        hash_val = hash_row(row)
        data_str = json.dumps(row)

        # Skip duplicates
        cursor.execute(f"SELECT 1 FROM {SQL_CREDS['PG_SCHEMA']}.{BI_META['BI_STAGING_TABLE']} WHERE hash = ?", (hash_val,))
        if cursor.fetchone():
            duplicate_count += 1
            continue

        # Insert new record
        cursor.execute(
            f"INSERT INTO {SQL_CREDS['PG_SCHEMA']}.{BI_META['BI_STAGING_TABLE']} (event_time, data_json, hash) VALUES (?, ?, ?)",
            (tf_ts, data_str, hash_val)
        )
        inserted_count += 1

        # Update max TF_TIMESTAMP
        if not max_tf_ts or tf_ts > max_tf_ts:
            max_tf_ts = tf_ts

    conn.commit()
    logging.info(f"{inserted_count} rows inserted to DB, {duplicate_count} duplicates skipped")
    return inserted_count, max_tf_ts

# === Log ETL run result ===
def log_etl_result(conn, success, inserted_rows, max_ts):
    logging.info(f"Logging ETL run to DB")
    cursor = conn.cursor()
    cursor.execute(
        f"INSERT INTO {SQL_CREDS['PG_SCHEMA']}.{BI_META['BI_LOG_TABLE']} (run_time, service_name, success, inserted_rows, max_event_time) VALUES (?, ?, ?, ?, ?)",
        (datetime.now(timezone.utc), BI_META['BI_SERVICE_NAME'], success, inserted_rows, max_ts)
    )
    conn.commit()

In [36]:
import time
timestamp_ms = int(time.time() * 1000) - 600 * 60 * 60 * 1000  # 6 hours ago in ms

rows = fetch_usage_data(timestamp_ms, access_token)


2025-07-08 22:48:59,105 Getting REST Data
2025-07-08 22:48:59,109 Page_01
2025-07-08 22:48:59,541 Page_02
2025-07-08 22:49:02,501 Page_03
2025-07-08 22:49:02,819 Page_04


In [37]:
remove_available_markets(rows)

In [22]:
timestamp_ms

1751790440647

In [39]:
for i, row in enumerate(rows):
    track = row['track']
    print(f"{i+1}. {row.get('played_at')}: {track['name']} – {track['artists'][0]['name']}")

1. 2025-07-02T19:25:34.902Z: Movement – LCD Soundsystem
2. 2025-07-02T19:21:51.542Z: Slice Paper Wrists – Poison The Well
3. 2025-07-02T19:17:55.546Z: Dragula – Rob Zombie
4. 2025-07-02T19:14:12.345Z: 5 Minutes Alone – Pantera
5. 2025-07-02T19:08:25.391Z: L'enfant sauvage – Gojira
6. 2025-07-02T19:03:35.297Z: Save Me – Damageplan
7. 2025-07-02T19:00:00.793Z: Fixation on the Darkness – Killswitch Engage
8. 2025-07-02T18:55:56.476Z: Surreal Atrocites – A Life Once Lost
9. 2025-07-02T18:36:56.972Z: Frankenstein – Clutch
10. 2025-07-02T18:31:16.197Z: Love? – Strapping Young Lad
11. 2025-07-02T18:26:21.706Z: Sirens – Samavayo
12. 2025-07-02T18:21:13.456Z: Diana – Chelsea Wolfe
13. 2025-07-02T18:16:55.576Z: Cabin Fever – Slomosa
14. 2025-07-02T18:11:04.980Z: Siberian Kiss - 2009 Remaster – Glassjaw
15. 2025-07-02T18:07:15.155Z: Tip Your Bartender – Glassjaw
16. 2025-07-02T18:02:56.035Z: Broken Man – St. Vincent
17. 2025-07-02T17:59:32.474Z: Carbonara – Spliff
18. 2025-07-02T17:54:43.074Z: Ma

In [40]:
len(rows)

50

In [None]:
# === Main ETL logic ===
def run_etl():
    conn = get_sql_conn()

    # read maximum timestamp in staging_table
    latest_ts = get_latest_timestamp(conn)
    today = datetime.now(timezone.utc).date()
    hour_str = '00'
    logging.info(f"Latest timestamp is: {latest_ts}")
    
    inserted_total = 0
    max_data_ts = latest_ts

    try:
        token = get_access_token()

        for day in range(0, (today - latest_ts.date()).days + 1):
            
            if day == 0:
                hour_str = f"{latest_ts.hour:02d}"

            date_str = (latest_ts.date() + timedelta(days=day)).isoformat()

            logging.info(f"Daily extract for: {date_str}, with offset {hour_str} hours.")
            #daily_rows = []
            daily_rows = fetch_usage_data(date_str, hour_str, token)

            if daily_rows:
                inserted, max_tf_ts = insert_new_data(conn, daily_rows)
                inserted_total += inserted
                if max_tf_ts and max_tf_ts > max_data_ts:
                    max_data_ts = max_tf_ts

        
        log_etl_result(conn, True, inserted_total, max_data_ts)
        logging.info(f"✅ Success: Inserted {inserted_total} records.")
    
    except Exception as e:
        log_etl_result(conn, False, inserted_total, max_data_ts)
        logging.info(f"❌ Failure: {e}")

    finally:
        conn.close()

In [None]:
run_etl()