# 01 Spotify Ingestion, Cleaning, and SQLite Load

This notebook ingests exported Spotify Streaming History JSON files, normalizes them into a canonical event schema, creates session IDs, and loads the results into a SQLite database table called `listening_events`.

Notes:
- This is Spotify-only, Apple Music will be added later.
- The goal is to create a clean event table, not compute KPIs yet.

In [None]:
import sqlite3
import pandas as pd
import json
import glob
from pathlib import Path

## Step 1, Define paths and basic parameters

Expected Spotify export files:
- `data/raw/spotify/StreamingHistory*.json`

Output:
- SQLite database: `data/processed/MusicPlatformInsights.db`
- Table: `listening_events`

In [None]:
DatabasePath = "data/processed/MusicPlatformInsights.db"
RawDataPath = "data/raw/spotify/StreamingHistory*.json"

SessionGapMinutes = 30
MinPlaySeconds = 30

## Step 2, Find Spotify files

If no files are found yet, that is fine, it just means Spotify has not finished exporting.

In [None]:
files = glob.glob(RawDataPath)

print("Files found:", len(files))
for file in files[:5]:
    print(file)

if len(files) == 0:
    raise FileNotFoundError("No Spotify StreamingHistory JSON files found in data/raw/spotify/")

## Step 3, Load all JSON files into a single DataFrame

Spotify files are lists of listening events. We will stack them together into one dataset.

In [None]:
SpotifyFrames = []

for file in files:
    print(f"Loading {file}...")
    with open(file, "r", encoding="utf-8") as f:
        SpotifyFrames.append(pd.DataFrame(json.load(f)))

spotify = pd.concat(SpotifyFrames, ignore_index=True)

print("Rows:", len(spotify))
spotify.head()

## Step 4, Rename columns to canonical schema and clean types

Spotify fields we expect:
- endTime
- artistName
- trackName
- msPlayed

Canonical fields:
- event_time
- platform
- artist
- track
- duration_minutes

In [None]:
spotify = spotify.rename(columns={
    "endTime": "event_time",
    "artistName": "artist",
    "trackName": "track",
    "msPlayed": "ms_played"
})

spotify["event_time"] = pd.to_datetime(spotify["event_time"], errors="coerce")
spotify["platform"] = "spotify"
spotify["duration_minutes"] = spotify["ms_played"] / 1000 / 60

# Filter very short plays
spotify = spotify[spotify["ms_played"] >= (MinPlaySeconds * 1000)]

# Keep only canonical fields for now
spotify = spotify[["event_time", "platform", "artist", "track", "duration_minutes"]]

spotify = spotify.sort_values("event_time")

print("Clean rows:", len(spotify))
spotify.head()

## Step 5, Quick sanity checks

We check:
- date range
- null counts
- basic duration stats

In [None]:
print("Min event_time:", spotify["event_time"].min())
print("Max event_time:", spotify["event_time"].max())

print("\nNull counts:")
print(spotify.isna().sum())

print("\nDuration minutes summary:")
print(spotify["duration_minutes"].describe())

## Step 6, Create session IDs

Definition:
- A new session starts when the gap between consecutive events is greater than `SessionGapMinutes`.

In [None]:
spotify["prev_time"] = spotify["event_time"].shift()
spotify["gap_minutes"] = (spotify["event_time"] - spotify["prev_time"]).dt.total_seconds() / 60

spotify["new_session"] = spotify["gap_minutes"].isna() | (spotify["gap_minutes"] > SessionGapMinutes)
spotify["session_id"] = spotify["new_session"].cumsum()

spotify = spotify.drop(columns=["prev_time", "gap_minutes", "new_session"])

spotify.head()

## Step 7, Create SQLite database and table

We will store all normalized events in one table:
- `listening_events`

Spotify loads first and initializes the table.
Apple Music will later append rows to the same table.

In [None]:
# Ensure output folder exists
Path("data/processed").mkdir(parents=True, exist_ok=True)

connect = sqlite3.connect(DatabasePath)
cursor = connect.cursor()

cursor.execute("""
CREATE TABLE IF NOT EXISTS listening_events (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    event_time TEXT,
    platform TEXT,
    artist TEXT,
    track TEXT,
    duration_minutes REAL,
    session_id INTEGER
);
""")

connect.commit()
print("Table ready: listening_events")

## Step 8, Load Spotify events into SQLite

For Spotify-first, we will replace any existing table contents for a clean reset while iterating.

In [None]:
spotify.to_sql(
    "listening_events",
    connect,
    if_exists="replace",
    index=False
)

connect.commit()

print("Loaded rows:", len(spotify))
print("Database:", DatabasePath)

connect.close()

## Step 9, Verify rows exist in SQLite

We run a simple SQL check to confirm the table is populated.

In [None]:
connect = sqlite3.connect(DatabasePath)

CheckQuery = """
SELECT 
    COUNT(*) AS RowCount,
    MIN(event_time) AS MinTime,
    MAX(event_time) AS MaxTime
FROM listening_events
WHERE platform = 'spotify';
"""

verification = pd.read_sql_query(CheckQuery, connect)
connect.close()

verification