# Grab spotify usage data data from REST API

- Purpose: 
    - query timestamp of latest data point from ETL table,
    - obtain a service pricipal OAUTH token,
    - For each day from last point until today: gather transactional JSON data (paginated),
    - concat and process data,
    - insert data to interface table,
    - add loading success/failure and stats to etl table.
- Author: vsm
- Date: 2025-07-08
- Team: GS - BI/ERP

## Requirements

1. Update package lists
`sudo apt-get update`

2. Install unixODBC and PostgreSQL ODBC driver
`sudo apt-get install -y unixodbc unixodbc-dev odbc-postgresql`

3. Optional: also install PostgreSQL client utilities
`sudo apt-get install -y postgresql-client`



### Load Parameters and modules

In [None]:
import requests
import json
import hashlib
import pyodbc
import base64
import urllib.parse
from datetime import datetime, timedelta, timezone, time

from dotenv import dotenv_values

import sys
import logging

# remove orphaned logging handlers
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(asctime)s %(message)s")

env_dict = dotenv_values("./.env")

API_CREDS = {
    "CLIENT_ID": env_dict['SPOTIFY_CLIENT_ID'],
    "CLIENT_SECRET": env_dict['SPOTIFY_CLIENT_SECRET'],
    "SCOPE": env_dict['SPOTIFY_SCOPE'],
    "REDIRECT_URI": env_dict['SPOTIFY_REDIRECT_URI'],
    "OAUTH_URL": env_dict['SPOTIFY_OAUTH_URL'],
    "REST_URL": env_dict['SPOTIFY_REST_URL'],
}

BI_META = {
    "BI_SERVICE_NAME": env_dict['BI_SERVICE_NAME'],
    "BI_STAGING_TABLE": env_dict['BI_STAGING_TABLE'],
    "BI_LOG_TABLE": env_dict['BI_LOG_TABLE'],
    "BI_INGEST_TS": env_dict['BI_INGEST_TS'],
}

SQL_CREDS = {
    "PG_SERVER": env_dict['PG_SERVER'],
    "PG_PORT": env_dict['PG_PORT'],
    "PG_DB": env_dict['PG_DB'],
    "PG_SCHEMA": env_dict['PG_SCHEMA'],
    "PG_USER": env_dict['PG_USER'],
    "PG_PWD": env_dict['PG_PWD'],
}

logging.info("Loading Modules")


## Authorization Code Flow Overview

1. Redirect user to Spotify login & consent screen.

2. Spotify redirects back with code → exchange this for an access_token and refresh_token.

3. Use the access_token to call APIs on behalf of the user.

4. Use the refresh_token to get a new access token when expired.

### 1. Generate Auth URL & Open It

In [None]:
params = {
    "client_id": API_CREDS['CLIENT_ID'],
    "response_type": "code",
    "redirect_uri": API_CREDS['REDIRECT_URI'],
    "scope": API_CREDS['SCOPE'],
}

auth_url = "https://accounts.spotify.com/authorize?" + urllib.parse.urlencode(params)
logging.info("Go to the following URL and authorize the app:")
logging.info(auth_url)

# Optional: open it automatically
#import webbrowser
#webbrowser.open(auth_url)

### 2. Manually Paste the Redirect URL and get Code from callback URL

In [None]:
redirected_url = input("Paste the full redirect URL here: ")
code = urllib.parse.parse_qs(urllib.parse.urlparse(redirected_url).query)['code'][0]
logging.info(f"Auth code: {code}")


### 3. Exchange Code for Tokens

In [None]:
def get_tokens(code):
    token_url = "https://accounts.spotify.com/api/token"

    auth_header = base64.b64encode(f"{API_CREDS['CLIENT_ID']}:{API_CREDS['CLIENT_SECRET']}".encode()).decode()
    headers = {
        "Authorization": f"Basic {auth_header}",
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "grant_type": "authorization_code",
        "code": code,
        "redirect_uri": API_CREDS['REDIRECT_URI'],
    }

    res = requests.post(token_url, headers=headers, data=data)
    res.raise_for_status()
    return res.json()

tokens = get_tokens(code)
access_token = tokens["access_token"]
refresh_token = tokens["refresh_token"]

logging.info(f"Access token: {access_token}")
logging.info(f"Refresh token: {refresh_token}")

### 4. Make testing call and reinitialize token if needed

In [None]:
# Usage (assumes access_token and refresh_token are valid and initialized)
# recent_tracks = get_recently_played(access_token, refresh_token)

def refresh_access_token(refresh_token):
    token_url = "https://accounts.spotify.com/api/token"
    auth_header = base64.b64encode(f"{API_CREDS['CLIENT_ID']}:{API_CREDS['CLIENT_SECRET']}".encode()).decode()

    headers = {
        "Authorization": f"Basic {auth_header}",
        "Content-Type": "application/x-www-form-urlencoded"
    }

    data = {
        "grant_type": "refresh_token",
        "refresh_token": refresh_token
    }

    res = requests.post(token_url, headers=headers, data=data)
    res.raise_for_status()
    return res.json()["access_token"]


def get_recently_played(access_token, refresh_token, limit=5):
    def fetch(access_token):
        url = f"https://api.spotify.com/v1/me/player/recently-played?limit={limit}"
        headers = {"Authorization": f"Bearer {access_token}"}
        return requests.get(url, headers=headers)

    res = fetch(access_token)

    # Handle expired token
    if res.status_code == 401:
        error_msg = res.json().get("error", {}).get("message", "")
        if error_msg == "The access token expired":
            logging.info("Access token expired. Refreshing...")
            access_token = refresh_access_token(refresh_token)
            res = fetch(access_token)

    # Still failed after refresh
    if res.status_code != 200:
        raise Exception(f"Spotify API error: {res.status_code} – {res.text}")

    # Parse and log tracks
    for i, item in enumerate(res.json().get("items", [])):
        track = item["track"]
        logging.info(f"{i+1}. {track['name']} – {track['artists'][0]['name']}")
    
    return res.json()

res = get_recently_played(access_token, refresh_token)

### 5. Define and run Spotify ETL

In [None]:
# === Postgres Connection ===
def get_sql_conn():
    logging.info(f"Connecting to DB")
    return pyodbc.connect(
        f"DRIVER={{PostgreSQL Unicode}};"
        f"SERVER={SQL_CREDS['PG_SERVER']},{SQL_CREDS['PG_PORT']};"     
        f"DATABASE={SQL_CREDS['PG_DB']};"
        f"UID={SQL_CREDS['PG_USER']};"
        f"PWD={SQL_CREDS['PG_PWD']}"
    )

# === Get latest timestamp ===
def get_latest_timestamp(conn):
    cursor = conn.cursor()
    cursor.execute(f"SELECT COALESCE(MAX(event_time), '2025-06-30') FROM {SQL_CREDS['PG_SCHEMA']}.{BI_META['BI_STAGING_TABLE']}")
    result = cursor.fetchone()
    logging.info(f"Getting maximum event timestamp {result}")
    return result[0] if result else datetime(2025, 6, 30)

# === Get Token via Client Credentials Flow ==
# === Note: this will not suffice for personal data
def get_access_token():
    logging.info("Getting OAuth")
    
    auth_str = f"{API_CREDS['CLIENT_ID']}:{API_CREDS['CLIENT_SECRET']}"
    b64_auth_str = base64.b64encode(auth_str.encode()).decode()

    headers = {
        'Authorization': f'Basic {b64_auth_str}',
        'Content-Type': 'application/x-www-form-urlencoded'
    }

    data = {
        'grant_type': 'client_credentials'
    }

    url = API_CREDS['OAUTH_URL']

    res = requests.post(url, headers=headers, data=data)
    res.raise_for_status()
    
    return res.json()['access_token']

# === Fetch paginated spotify usage data ===
def fetch_usage_data(timestamp_ms, token):
    logging.info("Getting REST Data")
    limit = 20
    rows = []

    base_url = f"{API_CREDS['REST_URL']}/me/player/recently-played"
    headers = {"Authorization": f"Bearer {token}"}
    params = {"after": timestamp_ms, "limit": limit}
    url = base_url
    counter = 1

    while url:
        logging.info(f"Page_{counter:02d}")
        resp = requests.get(url, headers=headers, params=params if counter == 1 else None)
        resp.raise_for_status()
        data = resp.json()
        rows.extend(data.get("items", []))
        url = data.get("next")
        params = None  # Only apply params on the first page
        counter += 1

    return rows

# === Sanitize JSON row ===
def remove_available_markets_fields(rows):
    for item in rows:
        track = item.get("track", {})
        track.pop("available_markets", None)

        album = track.get("album", {})
        album.pop("available_markets", None)

# === Hash JSON row ===
def hash_row(row):
    return hashlib.sha256(json.dumps(row, sort_keys=True).encode()).hexdigest()

# === Insert new data (with hashdiff) ===
def insert_new_data(conn, rows):
    logging.info(f"Inserting relevant data to DB")
    inserted_count = 0
    duplicate_count =0

    cursor = conn.cursor()

    max_tf_ts = datetime(2025, 6, 30, tzinfo=timezone.utc)

    for row in rows:
        tf_ts_str = row.get(BI_META['BI_INGEST_TS'])
        if not tf_ts_str:
            logging.warning(f"JSON payload does not contain {BI_META['BI_INGEST_TS']} key")
            continue  # skip if missing transaction timestamp

        try:
            tf_ts = datetime.fromisoformat(tf_ts_str.replace("Z", "+00:00"))
        except ValueError:
            logging.warning(f"Row:{inserted_count + 1}: Timestamp {BI_META['BI_INGEST_TS']} malformed")
            continue  # skip malformed timestamp

        hash_val = hash_row(row)
        data_str = json.dumps(row)

        # Skip duplicates
        cursor.execute(f"SELECT 1 FROM {SQL_CREDS['PG_SCHEMA']}.{BI_META['BI_STAGING_TABLE']} WHERE hash = ?", (hash_val,))
        if cursor.fetchone():
            duplicate_count += 1
            continue

        # Insert new record
        cursor.execute(
            f"INSERT INTO {SQL_CREDS['PG_SCHEMA']}.{BI_META['BI_STAGING_TABLE']} (event_time, data_json, hash) VALUES (?, ?, ?)",
            (tf_ts, data_str, hash_val)
        )
        inserted_count += 1

        # Update max TF_TIMESTAMP
        if not max_tf_ts or tf_ts > max_tf_ts:
            max_tf_ts = tf_ts

    conn.commit()
    logging.info(f"{inserted_count} rows inserted to DB, {duplicate_count} duplicates skipped")
    return inserted_count, max_tf_ts

# === Log ETL run result ===
def log_etl_result(conn, success, inserted_rows, max_ts):
    logging.info(f"Logging ETL run to DB")
    cursor = conn.cursor()
    cursor.execute(
        f"INSERT INTO {SQL_CREDS['PG_SCHEMA']}.{BI_META['BI_LOG_TABLE']} (run_time, service_name, success, inserted_rows, max_event_time) VALUES (?, ?, ?, ?, ?)",
        (datetime.now(timezone.utc), BI_META['BI_SERVICE_NAME'], success, inserted_rows, max_ts)
    )
    conn.commit()

In [None]:
# === Main ETL logic ===
def run_etl():
    conn = get_sql_conn()

    # read maximum timestamp in staging_table
    latest_ts = get_latest_timestamp(conn)
    now_utc = datetime.now(timezone.utc)
    logtime = now_utc.isoformat()    

    logging.info(f"Latest timestamp is: {latest_ts}")

    inserted = 0

    try:

        latest_ts_int = int(latest_ts.timestamp() * 1000)
        logging.info(f"Daily extract for unix time offset {latest_ts_int} ms.")

        rows = fetch_usage_data(latest_ts_int, access_token)

        if rows:
            # simplify payload, in-place
            remove_available_markets_fields(rows)
            inserted, logtime = insert_new_data(conn, rows)

        log_etl_result(conn, True, inserted, logtime)
        logging.info(f"✅ Success: Inserted {inserted} records.")

    except Exception as e:
        log_etl_result(conn, False, inserted, logtime)
        logging.info(f"❌ Failure: {e}")

    finally:
        conn.close()

run_etl()

In [None]:
if True:
    raise SystemExit

### Appendix

In [None]:
from datetime import datetime, timezone, time

# Get current time in UTC with tzinfo
now_utc = datetime.now(timezone.utc)
timestamptz_value = now_utc.isoformat()
logging.info(timestamptz_value)  # e.g. '2025-07-09T15:30:00+00:00'


# Get today's date and set time to 00:00
midnight_utc = datetime.combine(datetime.now(timezone.utc).date(), time.min).replace(tzinfo=timezone.utc)
# Convert to UNIX timestamp in milliseconds
unix_ms_utc = int(midnight_utc.timestamp() * 1000)
logging.info(unix_ms_utc)  # e.g. 1752019200000

In [None]:
conn = get_sql_conn()

# read maximum timestamp in staging_table
latest_ts = get_latest_timestamp(conn)
latest_ts.timestamp() * 1000