# ðŸŽµ Musical Tracks Database â€“ Core Build
### SQLite project for track/artist/album data ingestion
**Phase 1**: Core setup â€” imports, directories, schema, data loader  
Author: Gabe M Chavez  


Cell 1 â€” Setup & Imports

In [None]:
# ==========================================================
# Project: Musical Tracks Database (Portfolio Version)
# Author: Gabe M Chavez
# Purpose: Build a clean, linear SQLite database from CSV/JSON
#          inspired by the PY4E Tracks Assignment.
# Phase: Core Build â€” Schema â†’ Ingest â†’ Assignment Joins
# ==========================================================

# Cell 1 â€” Setup & Imports
from pathlib import Path
import sqlite3
import csv
import json

# Working directories (core-first build)
WORKDIR = Path('/content/music_core')
RAW_PATH = WORKDIR / 'raw'     # put your input CSV/JSON here
DB_PATH = WORKDIR / 'musicdb.sqlite'

# Ensure folders exist
WORKDIR.mkdir(parents=True, exist_ok=True)
RAW_PATH.mkdir(parents=True, exist_ok=True)

print("Working dir:", WORKDIR)
print("Raw data folder:", RAW_PATH)
print("DB path:", DB_PATH)


Working dir: /content/music_core
Raw data folder: /content/music_core/raw
DB path: /content/music_core/musicdb.sqlite


Cell 2 â€” Helper Functions

In [None]:
# ==========================================================
# Cell 2 â€” Helper Functions
# Purpose: Basic database helpers for connecting, running SQL,
#          and fetching results. Keeps code clean for later cells.
# ==========================================================

def connect_db(db_path=DB_PATH):
    """Create (or connect to) the SQLite database."""
    return sqlite3.connect(db_path)

def run_sql(conn, sql, params=None):
    """Execute SQL (DDL/DML) with optional params and commit."""
    cur = conn.cursor()
    cur.execute(sql, params or [])
    conn.commit()

def query_all(conn, sql, params=None):
    """Return list of rows from a SELECT query."""
    cur = conn.cursor()
    cur.execute(sql, params or [])
    return cur.fetchall()

def query_one(conn, sql, params=None):
    """Return a single row or None."""
    cur = conn.cursor()
    cur.execute(sql, params or [])
    return cur.fetchone()

print("Helper functions loaded.")


Helper functions loaded.


Cell 3 â€” Create Schema (DDL)


In [None]:
# ==========================================================
# Cell 3 â€” Create Schema (DDL)
# Purpose: Define normalized tables for Artist, Album, Genre, Track.
# Notes:
# - Uses UNIQUE constraints to avoid duplicates.
# - Enables FOREIGN KEY integrity.
# ==========================================================

SCHEMA_DDL = [
    """
    PRAGMA foreign_keys = ON;
    """,
    """
    CREATE TABLE IF NOT EXISTS Artist(
        id   INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT UNIQUE NOT NULL
    );
    """,
    """
    CREATE TABLE IF NOT EXISTS Album(
        id        INTEGER PRIMARY KEY AUTOINCREMENT,
        title     TEXT NOT NULL,
        artist_id INTEGER NOT NULL,
        year      INTEGER,
        FOREIGN KEY(artist_id) REFERENCES Artist(id),
        UNIQUE(title, artist_id)
    );
    """,
    """
    CREATE TABLE IF NOT EXISTS Genre(
        id   INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT UNIQUE
    );
    """,
    """
    CREATE TABLE IF NOT EXISTS Track(
        id           INTEGER PRIMARY KEY AUTOINCREMENT,
        title        TEXT NOT NULL,
        album_id     INTEGER NOT NULL,
        genre_id     INTEGER,
        milliseconds INTEGER,
        rating       INTEGER,
        play_count   INTEGER,
        FOREIGN KEY(album_id) REFERENCES Album(id),
        FOREIGN KEY(genre_id) REFERENCES Genre(id),
        UNIQUE(title, album_id)
    );
    """
]

def create_schema(conn):
    cur = conn.cursor()
    for stmt in SCHEMA_DDL:
        cur.execute(stmt)
    conn.commit()

# Execute once to build schema
conn = connect_db(DB_PATH)
create_schema(conn)
conn.close()
print("Schema created (Artist, Album, Genre, Track). Foreign keys enabled.")


Schema created (Artist, Album, Genre, Track). Foreign keys enabled.


Cell 4 â€” Data Cleaning Utils

In [None]:
# ==========================================================
# Cell 4 â€” Data Cleaning Utils
# Purpose: Normalize raw strings before upserts (trim, collapse
#          whitespace, standardize casing). Keeps ETL consistent.
# ==========================================================
import re

_ws_re = re.compile(r"\s+")

def _norm_base(s):
    """Trim and collapse inner whitespace. Return None if empty."""
    if s is None:
        return None
    s = str(s).strip()
    s = _ws_re.sub(" ", s)
    return s if s else None

def clean_artist(name: str):
    """Normalize artist names; keep original case where reasonable."""
    name = _norm_base(name)
    return name

def clean_album(title: str):
    """Normalize album titles; preserve case."""
    title = _norm_base(title)
    return title

def clean_title(title: str):
    """Normalize track titles; preserve case."""
    title = _norm_base(title)
    return title

def clean_genre(name: str):
    """Normalize genre names to Title Case (e.g., 'deep house' -> 'Deep House')."""
    name = _norm_base(name)
    if name is None:
        return None
    return name.title()

print("Cleaning helpers ready: clean_artist, clean_album, clean_title, clean_genre")


Cleaning helpers ready: clean_artist, clean_album, clean_title, clean_genre


Cell 5 â€” Loader (CSV/JSON) + Upserts

In [None]:
# ==========================================================
# Cell 5 â€” Loader (CSV/JSON) + Upserts
# Purpose: Read rows from CSV/JSON and upsert into Artist, Album,
#          Genre, Track with idempotent operations.
# Notes:
# - Accepts flexible column names (e.g., title/track, milliseconds/length).
# - Uses INSERT OR IGNORE, then SELECT id to fetch PKs.
# ==========================================================
from pathlib import Path

# ---------- Upserts ----------

def upsert_artist(conn, name: str) -> int:
    name = clean_artist(name)
    if not name:
        raise ValueError("Artist name is required")
    cur = conn.cursor()
    cur.execute("INSERT OR IGNORE INTO Artist(name) VALUES (?)", (name,))
    cur.execute("SELECT id FROM Artist WHERE name = ?", (name,))
    return cur.fetchone()[0]

def upsert_album(conn, title: str, artist_name: str, year: int | None = None) -> int:
    title = clean_album(title)
    if not title:
        raise ValueError("Album title is required")
    artist_id = upsert_artist(conn, artist_name)
    cur = conn.cursor()
    cur.execute(
        "INSERT OR IGNORE INTO Album(title, artist_id, year) VALUES (?, ?, ?)",
        (title, artist_id, year),
    )
    cur.execute("SELECT id FROM Album WHERE title = ? AND artist_id = ?", (title, artist_id))
    return cur.fetchone()[0]

def upsert_genre(conn, name: str | None) -> int | None:
    name = clean_genre(name)
    if not name:
        return None
    cur = conn.cursor()
    cur.execute("INSERT OR IGNORE INTO Genre(name) VALUES (?)", (name,))
    cur.execute("SELECT id FROM Genre WHERE name = ?", (name,))
    row = cur.fetchone()
    return row[0] if row else None

def upsert_track(
    conn,
    title: str,
    album_title: str,
    artist_name: str,
    genre_name: str | None = None,
    milliseconds: int | None = None,
    rating: int | None = None,
    play_count: int | None = None,
    year: int | None = None,
) -> int:
    title = clean_title(title)
    if not title:
        raise ValueError("Track title is required")

    album_id = upsert_album(conn, album_title, artist_name, year)
    genre_id = upsert_genre(conn, genre_name)

    cur = conn.cursor()
    cur.execute(
        """
        INSERT OR IGNORE INTO Track(title, album_id, genre_id, milliseconds, rating, play_count)
        VALUES (?, ?, ?, ?, ?, ?)
        """,
        (title, album_id, genre_id, milliseconds, rating, play_count),
    )
    # Uniqueness is (title, album_id)
    cur.execute("SELECT id FROM Track WHERE title = ? AND album_id = ?", (title, album_id))
    return cur.fetchone()[0]

# ---------- Row parsing helpers ----------

def _get_any(d: dict, *names, default=None):
    """Return the first present non-empty field from candidate names (case-insensitive)."""
    # build case-insensitive map
    lower_map = {str(k).lower(): v for k, v in d.items()}
    for n in names:
        if n is None:
            continue
        v = lower_map.get(str(n).lower())
        if v is not None and str(v).strip() != "":
            return v
    return default

def _to_int_or_none(x):
    try:
        if x is None or str(x).strip() == "":
            return None
        return int(float(x))
    except Exception:
        return None

# Accept common field names from varied exports
FIELD_MAP = {
    "title": ("title", "track", "name", "track_name"),
    "artist": ("artist", "artist_name"),
    "album": ("album", "album_title"),
    "genre": ("genre",),
    "milliseconds": ("milliseconds", "duration_ms", "length", "time_ms"),
    "rating": ("rating", "stars"),
    "play_count": ("play_count", "plays", "playcount"),
    "year": ("year", "release_year"),
}

# ---------- CSV / JSON readers ----------

def load_rows_from_csv(path: Path):
    """Yield dictionaries from CSV with flexible headers."""
    with open(path, newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield row

def load_rows_from_json(path: Path):
    """Yield dictionaries from JSON. Supports list[...] or dict{items:[...] }."""
    with open(path, encoding="utf-8") as f:
        data = json.load(f)
    if isinstance(data, list):
        for row in data:
            yield row
    elif isinstance(data, dict):
        # try common container keys
        for key in ("items", "tracks", "rows", "data"):
            if key in data and isinstance(data[key], list):
                for row in data[key]:
                    yield row
                return
        # fallback: yield the dict itself if it looks like a single row
        yield data
    else:
        raise ValueError("Unsupported JSON structure")

def row_to_track_kwargs(row: dict) -> dict:
    """Map a raw row to kwargs for upsert_track with cleaning and type conversion."""
    title = _get_any(row, *FIELD_MAP["title"])
    artist = _get_any(row, *FIELD_MAP["artist"])
    album = _get_any(row, *FIELD_MAP["album"])
    genre = _get_any(row, *FIELD_MAP["genre"])
    milliseconds = _to_int_or_none(_get_any(row, *FIELD_MAP["milliseconds"]))
    rating = _to_int_or_none(_get_any(row, *FIELD_MAP["rating"]))
    play_count = _to_int_or_none(_get_any(row, *FIELD_MAP["play_count"]))
    year = _to_int_or_none(_get_any(row, *FIELD_MAP["year"]))

    return dict(
        title=title,
        album_title=album,
        artist_name=artist,
        genre_name=genre,
        milliseconds=milliseconds,
        rating=rating,
        play_count=play_count,
        year=year,
    )

print("Loaders and upserts ready (CSV/JSON, flexible fields).")


Loaders and upserts ready (CSV/JSON, flexible fields).


Cell 6 â€” ETL Pipeline (ingest)

In [None]:
# ==========================================================
# Cell 6 â€” ETL Pipeline (ingest)
# Purpose: Read a CSV/JSON file, transform rows -> upserts, and
#          load into the DB. Idempotent & safe to re-run.
# ==========================================================
from typing import Iterable

def _detect_loader(path: Path):
    suf = path.suffix.lower()
    if suf == ".csv":
        return load_rows_from_csv
    if suf in {".json", ".jsn"}:
        return load_rows_from_json
    raise ValueError(f"Unsupported file type: {suf}")

def ingest_file(path: Path, batch_size: int = 500) -> dict:
    """
    Ingest a single CSV/JSON file into the database.
    Returns stats dict: {'rows': int, 'inserted': int, 'skipped': int, 'errors': int}
    """
    if not path.exists():
        raise FileNotFoundError(path)
    loader = _detect_loader(path)

    stats = {"rows": 0, "inserted": 0, "skipped": 0, "errors": 0}
    conn = connect_db(DB_PATH)
    cur = conn.cursor()
    try:
        for row in loader(path):
            stats["rows"] += 1
            try:
                kwargs = row_to_track_kwargs(row)
                # minimal required fields
                if not kwargs["title"] or not kwargs["album_title"] or not kwargs["artist_name"]:
                    stats["skipped"] += 1
                    continue
                _ = upsert_track(conn, **kwargs)
                stats["inserted"] += 1
            except Exception as e:
                stats["errors"] += 1
                # Lightweight debug: show every 200th error to avoid noise
                if stats["errors"] <= 5 or stats["errors"] % 200 == 0:
                    print("Row error:", e)

            # Commit in batches
            if stats["rows"] % batch_size == 0:
                conn.commit()
                print(f" ... processed {stats['rows']} rows")

        conn.commit()
    finally:
        conn.close()

    print(f"Ingest complete: {stats}")
    return stats

def ingest_all_in_raw() -> dict:
    """
    Ingest all CSV/JSON files found in RAW_PATH.
    Aggregates stats across files.
    """
    totals = {"rows": 0, "inserted": 0, "skipped": 0, "errors": 0, "files": 0}
    files = sorted([p for p in RAW_PATH.iterdir() if p.suffix.lower() in {".csv", ".json", ".jsn"}])
    if not files:
        print(f"No CSV/JSON files found in {RAW_PATH}")
        return totals
    for f in files:
        print(f"Ingesting: {f.name}")
        s = ingest_file(f)
        for k in ("rows", "inserted", "skipped", "errors"):
            totals[k] += s[k]
        totals["files"] += 1
    print(f"All ingests complete: {totals}")
    return totals

print("ETL ready: ingest_file(path), ingest_all_in_raw()")


ETL ready: ingest_file(path), ingest_all_in_raw()


Cell 7 â€” Sanity Checks

In [None]:
# ==========================================================
# Cell 7 â€” Sanity Checks
# Purpose: Quick integrity & content checks after ingestion.
# - Table row counts
# - SQLite integrity + foreign key checks
# - Data quality: tracks missing genre
# - Preview sample rows from each table
# ==========================================================

def _print_counts(conn):
    tables = ["Artist", "Album", "Genre", "Track"]
    for t in tables:
        row = query_one(conn, f"SELECT COUNT(*) FROM {t}")
        print(f"{t:>6}: {row[0] if row else 0}")

def _print_sample(conn, table, limit=5):
    print(f"\nSample from {table} (up to {limit} rows):")
    rows = query_all(conn, f"SELECT * FROM {table} LIMIT {limit}")
    for r in rows:
        print("  ", r)

def sanity_report():
    conn = connect_db(DB_PATH)
    try:
        print("== Table counts ==")
        _print_counts(conn)

        print("\n== SQLite integrity checks ==")
        print("PRAGMA integrity_check =>", query_one(conn, "PRAGMA integrity_check;")[0])
        fk_rows = query_all(conn, "PRAGMA foreign_key_check;")
        if fk_rows:
            print("Foreign key issues:")
            for r in fk_rows[:10]:
                print("  ", r)
            if len(fk_rows) > 10:
                print(f"  ... and {len(fk_rows) - 10} more")
        else:
            print("Foreign key check => OK")

        print("\n== Data quality checks ==")
        # Tracks without a genre are allowed, but weâ€™ll report how many:
        missing_genre = query_one(conn, "SELECT COUNT(*) FROM Track WHERE genre_id IS NULL")[0]
        print(f"Tracks missing genre: {missing_genre}")

        # Duplicate (title, album) attempts (should be prevented by UNIQUE):
        dupes = query_one(conn, """
            SELECT COUNT(*) FROM (
                SELECT title, album_id, COUNT(*) c
                FROM Track
                GROUP BY title, album_id
                HAVING c > 1
            )
        """)[0]
        print(f"Duplicate (title, album) groups: {dupes}")

        # Orphan checks (should be zero due to FK constraints):
        orphan_albums = query_one(conn, """
            SELECT COUNT(*) FROM Album a
            WHERE NOT EXISTS (SELECT 1 FROM Artist ar WHERE ar.id = a.artist_id)
        """)[0]
        print(f"Albums without a valid artist (orphans): {orphan_albums}")

        print("\n== Samples ==")
        _print_sample(conn, "Artist")
        _print_sample(conn, "Album")
        _print_sample(conn, "Genre")
        _print_sample(conn, "Track")

    finally:
        conn.close()
    print("\nSanity report complete.")

print("Sanity checks ready. Call sanity_report() after ingest.")


Sanity checks ready. Call sanity_report() after ingest.


Cell 8 â€” Assignment Joins (Top 3, etc.)

In [None]:
# ==========================================================
# Cell 8 â€” Assignment Joins (Top 3, etc.)
# Purpose: Run the canonical joined SELECTs for the assignment.
# Outputs:
#  - Top 3 joined rows: Track, Artist, Album, Genre (alphabetical)
#  - Optional: a few handy summaries (counts)
# ==========================================================

def run_assignment_queries(limit=3):
    conn = connect_db(DB_PATH)
    try:
        print("== Top joined results ==")
        rows = query_all(conn, f"""
            SELECT Track.title, Artist.name, Album.title, COALESCE(Genre.name, 'Unknown')
            FROM Track
            JOIN Album ON Track.album_id = Album.id
            JOIN Artist ON Album.artist_id = Artist.id
            LEFT JOIN Genre ON Track.genre_id = Genre.id
            ORDER BY Artist.name, Album.title, Track.title
            LIMIT {int(limit)};
        """)
        for r in rows:
            print(tuple(r))

        # Optional small summaries (helpful for sanity and grading)
        print("\n== Summary counts ==")
        print("Artists:", query_one(conn, "SELECT COUNT(*) FROM Artist")[0])
        print("Albums :", query_one(conn, "SELECT COUNT(*) FROM Album")[0])
        print("Genres :", query_one(conn, "SELECT COUNT(*) FROM Genre")[0])
        print("Tracks :", query_one(conn, "SELECT COUNT(*) FROM Track")[0])

    finally:
        conn.close()
    print("\nAssignment queries complete.")

print("Assignment queries ready. Call run_assignment_queries().")


Assignment queries ready. Call run_assignment_queries().


Cell 9 â€” Analytics & Insights

In [None]:
# ==========================================================
# Cell 9 â€” Analytics & Insights
# Purpose: Portfolio-friendly summaries you can show in README.
# Outputs (printed):
#  - Top artists by number of tracks
#  - Top genres by tracks
#  - Longest tracks (by milliseconds)
#  - Albums with most tracks
#  - Data quality: tracks missing genre, duplicate titles per artist
# ==========================================================

def _print_rows(title, rows, limit=10):
    print(f"\n== {title} (top {limit}) ==")
    for r in rows[:limit]:
        print("  ", tuple(r))

def analytics_report(top_n=10):
    conn = connect_db(DB_PATH)
    try:
        # Top artists by track count
        artists = query_all(conn, """
            SELECT ar.name, COUNT(t.id) AS track_count
            FROM Artist ar
            JOIN Album al ON al.artist_id = ar.id
            JOIN Track t  ON t.album_id = al.id
            GROUP BY ar.id
            ORDER BY track_count DESC, ar.name ASC
            LIMIT ?;
        """, (top_n,))
        _print_rows("Top Artists by Track Count", artists, limit=top_n)

        # Top genres by track count
        genres = query_all(conn, """
            SELECT COALESCE(g.name, 'Unknown') AS genre, COUNT(t.id) AS track_count
            FROM Track t
            LEFT JOIN Genre g ON t.genre_id = g.id
            GROUP BY genre
            ORDER BY track_count DESC, genre ASC
            LIMIT ?;
        """, (top_n,))
        _print_rows("Top Genres by Track Count", genres, limit=top_n)

        # Longest tracks
        longest = query_all(conn, """
            SELECT t.title, ar.name, al.title, t.milliseconds
            FROM Track t
            JOIN Album al ON t.album_id = al.id
            JOIN Artist ar ON al.artist_id = ar.id
            WHERE t.milliseconds IS NOT NULL
            ORDER BY t.milliseconds DESC, t.title ASC
            LIMIT ?;
        """, (top_n,))
        _print_rows("Longest Tracks (ms)", longest, limit=top_n)

        # Albums with most tracks
        albums = query_all(conn, """
            SELECT ar.name, al.title, COUNT(t.id) AS track_count
            FROM Album al
            JOIN Artist ar ON al.artist_id = ar.id
            JOIN Track t   ON t.album_id = al.id
            GROUP BY al.id
            ORDER BY track_count DESC, ar.name ASC, al.title ASC
            LIMIT ?;
        """, (top_n,))
        _print_rows("Albums with Most Tracks", albums, limit=top_n)

        # Data quality checks
        print("\n== Data Quality ==")
        missing_genre = query_one(conn, "SELECT COUNT(*) FROM Track WHERE genre_id IS NULL")[0]
        print("Tracks missing genre:", missing_genre)

        dup_title_artist = query_all(conn, """
            SELECT ar.name, t.title, COUNT(*) AS c
            FROM Track t
            JOIN Album al ON t.album_id = al.id
            JOIN Artist ar ON al.artist_id = ar.id
            GROUP BY ar.name, t.title
            HAVING c > 1
            ORDER BY c DESC, ar.name ASC, t.title ASC
            LIMIT ?;
        """, (top_n,))
        _print_rows("Duplicate Titles per Artist", dup_title_artist, limit=top_n)

    finally:
        conn.close()
    print("\nAnalytics report complete. Consider capturing outputs for README.")

print("Analytics ready. Call analytics_report(top_n=10).")

Analytics ready. Call analytics_report(top_n=10).


Cell 10 â€” Export Tools

In [None]:
# ==========================================================
# Cell 10 â€” Export Tools
# Purpose: Save portfolio-ready CSV summaries to /exports.
# Exports:
#  - top_artists.csv
#  - top_genres.csv
#  - longest_tracks.csv
#  - albums_most_tracks.csv
#  - tracks_missing_genre.csv
#  - duplicate_titles_per_artist.csv
# ==========================================================
from pathlib import Path

EXPORTS_PATH = WORKDIR / "exports"
EXPORTS_PATH.mkdir(parents=True, exist_ok=True)

def _write_csv(path: Path, header: list[str], rows: list[tuple]):
    with open(path, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(header)
        for r in rows:
            w.writerow(list(r))

def export_summaries(top_n=100):
    conn = connect_db(DB_PATH)
    try:
        # Top artists by track count
        rows = query_all(conn, """
            SELECT ar.name, COUNT(t.id) AS track_count
            FROM Artist ar
            JOIN Album al ON al.artist_id = ar.id
            JOIN Track t  ON t.album_id = al.id
            GROUP BY ar.id
            ORDER BY track_count DESC, ar.name ASC
            LIMIT ?;
        """, (top_n,))
        _write_csv(EXPORTS_PATH / "top_artists.csv",
                   ["artist", "track_count"], rows)

        # Top genres by track count
        rows = query_all(conn, """
            SELECT COALESCE(g.name, 'Unknown') AS genre, COUNT(t.id) AS track_count
            FROM Track t
            LEFT JOIN Genre g ON t.genre_id = g.id
            GROUP BY genre
            ORDER BY track_count DESC, genre ASC
            LIMIT ?;
        """, (top_n,))
        _write_csv(EXPORTS_PATH / "top_genres.csv",
                   ["genre", "track_count"], rows)

        # Longest tracks
        rows = query_all(conn, """
            SELECT t.title, ar.name AS artist, al.title AS album, t.milliseconds
            FROM Track t
            JOIN Album al ON t.album_id = al.id
            JOIN Artist ar ON al.artist_id = ar.id
            WHERE t.milliseconds IS NOT NULL
            ORDER BY t.milliseconds DESC, t.title ASC
            LIMIT ?;
        """, (top_n,))
        _write_csv(EXPORTS_PATH / "longest_tracks.csv",
                   ["title", "artist", "album", "milliseconds"], rows)

        # Albums with most tracks
        rows = query_all(conn, """
            SELECT ar.name AS artist, al.title AS album, COUNT(t.id) AS track_count
            FROM Album al
            JOIN Artist ar ON al.artist_id = ar.id
            JOIN Track t   ON t.album_id = al.id
            GROUP BY al.id
            ORDER BY track_count DESC, artist ASC, album ASC
            LIMIT ?;
        """, (top_n,))
        _write_csv(EXPORTS_PATH / "albums_most_tracks.csv",
                   ["artist", "album", "track_count"], rows)

        # Tracks missing genre
        rows = query_all(conn, """
            SELECT t.title, ar.name AS artist, al.title AS album
            FROM Track t
            JOIN Album al ON t.album_id = al.id
            JOIN Artist ar ON al.artist_id = ar.id
            WHERE t.genre_id IS NULL
            ORDER BY artist ASC, album ASC, t.title ASC
            LIMIT ?;
        """, (max(top_n, 1000),))  # allow more here
        _write_csv(EXPORTS_PATH / "tracks_missing_genre.csv",
                   ["title", "artist", "album"], rows)

        # Duplicate titles per artist (same title appearing >1 across albums)
        rows = query_all(conn, """
            WITH title_counts AS (
                SELECT ar.name AS artist, t.title, COUNT(*) AS c
                FROM Track t
                JOIN Album al ON t.album_id = al.id
                JOIN Artist ar ON al.artist_id = ar.id
                GROUP BY ar.name, t.title
                HAVING c > 1
            )
            SELECT artist, title, c
            FROM title_counts
            ORDER BY c DESC, artist ASC, title ASC
            LIMIT ?;
        """, (top_n,))
        _write_csv(EXPORTS_PATH / "duplicate_titles_per_artist.csv",
                   ["artist", "title", "count"], rows)

        print("Exports written to:", EXPORTS_PATH)

    finally:
        conn.close()

print("Export tools ready. Call export_summaries(top_n=100).")


Export tools ready. Call export_summaries(top_n=100).


Cell 11 â€” README Generator

In [None]:
# ==========================================================
# Cell 11 â€” README Generator (Safe Version)
# Purpose: Create a clean README.md without f-strings or triple-quote issues.
# Output: /content/music_core/README.md
# ==========================================================
from datetime import datetime

README_PATH = WORKDIR / "README.md"

SCHEMA_ASCII = r"""
Schema (SQLite)
---------------
Artist(id PK, name UNIQUE NOT NULL)
Album(id PK, title NOT NULL, artist_id FK->Artist(id), year, UNIQUE(title, artist_id))
Genre(id PK, name UNIQUE)
Track(id PK, title NOT NULL, album_id FK->Album(id), genre_id FK->Genre(id),
      milliseconds, rating, play_count, UNIQUE(title, album_id))
"""

def _fetch_counts(conn):
    return {
        "artists":  query_one(conn, "SELECT COUNT(*) FROM Artist")[0],
        "albums":   query_one(conn, "SELECT COUNT(*) FROM Album")[0],
        "genres":   query_one(conn, "SELECT COUNT(*) FROM Genre")[0],
        "tracks":   query_one(conn, "SELECT COUNT(*) FROM Track")[0],
        "missing_genre": query_one(conn, "SELECT COUNT(*) FROM Track WHERE genre_id IS NULL")[0],
    }

def _fetch_top_preview(conn, limit=5):
    rows = query_all(conn, """
        SELECT t.title, ar.name, al.title, COALESCE(g.name, 'Unknown') AS genre
        FROM Track t
        JOIN Album al ON t.album_id = al.id
        JOIN Artist ar ON al.artist_id = ar.id
        LEFT JOIN Genre g ON t.genre_id = g.id
        ORDER BY ar.name, al.title, t.title
        LIMIT ?;
    """, (int(limit),))
    lines = ["| Track | Artist | Album | Genre |", "|---|---|---|---|"]
    for r in rows:
        lines.append(f"| {r[0]} | {r[1]} | {r[2]} | {r[3]} |")
    return "\n".join(lines)

def generate_readme():
    conn = connect_db(DB_PATH)
    try:
        counts = _fetch_counts(conn)
        preview_table_md = _fetch_top_preview(conn)
    finally:
        conn.close()

    now = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")

    # Build README text as a list of lines
    lines = []
    lines.append("# Musical Tracks Database â€“ Core Build\n")
    lines.append("A clean, **linear** SQLite build for track/artist/album ingestion.\n")
    lines.append(f"- **Author**: Gabe M Chavez")
    lines.append(f"- **Last Updated**: {now}")
    lines.append(f"- **Tracks loaded**: {counts['tracks']}\n")
    lines.append("---\n")
    lines.append("## Quick Start (Colab)")
    lines.append("1. Run Cells 1-8 in order.")
    lines.append(f"2. Put your input data into: `{RAW_PATH}`")
    lines.append("3. Ingest and check:")
    lines.append("    ```python")
    lines.append("    stats = ingest_all_in_raw()")
    lines.append("    sanity_report()")
    lines.append("    run_assignment_queries(limit=3)")
    lines.append("    ```\n")
    lines.append("---\n")
    lines.append("## Sample Joined Rows (Top 5)\n")
    lines.append(preview_table_md + "\n")
    lines.append("---\n")
    lines.append("## Schema")
    lines.append("```text")
    lines.append(SCHEMA_ASCII.strip())
    lines.append("```\n")
    lines.append("---\n")
    lines.append(f"## Project Structure\n`{WORKDIR}`\n")
    lines.append("- `musicdb.sqlite` â€” Final SQLite database")
    lines.append("- `raw/` â€” Input CSV/JSON files")
    lines.append("- `exports/` â€” CSV outputs from Cell 10")
    lines.append("- `README.md` â€” This file\n")

    # Write to file
    README_PATH.write_text("\n".join(lines), encoding="utf-8")
    print("README generated at:", README_PATH)

print("README generator ready. Call generate_readme().")


README generator ready. Call generate_readme().


Cell 12 â€” Notebook Wrap-Up

In [None]:
# ==========================================================
# Cell 12 â€” Notebook Wrap-Up
# Purpose: Final message to wrap up the notebook for viewers and collaborators.
# ==========================================================

def notebook_wrap_up():
    print("""
ðŸŽµ Musical Tracks Database â€” Notebook Complete!

âœ… Phase 1 (Core Build): Done
   - Schema created
   - Data ingested
   - Assignment joins verified

âœ… Phase 2 (Portfolio Enhancements): Done
   - Analytics & insights
   - Exported CSV summaries
   - Auto-generated README.md

Next steps (optional):
- Add a visual schema or ERD
- Extend analytics or add DJ-focused fields (BPM, Key)
- Package this into a CLI tool or Python module

Thank you for exploring this project.
Feel free to fork, extend, or remix it for your own use cases! ðŸš€
""")

print("Wrap-up cell ready. Call notebook_wrap_up().")


Wrap-up cell ready. Call notebook_wrap_up().


Cell 13 â€” Completion & Closing Message

In [None]:
notebook_wrap_up()



ðŸŽµ Musical Tracks Database â€” Notebook Complete!

âœ… Phase 1 (Core Build): Done  
   - Schema created  
   - Data ingested  
   - Assignment joins verified

âœ… Phase 2 (Portfolio Enhancements): Done  
   - Analytics & insights  
   - Exported CSV summaries  
   - Auto-generated README.md  

Next steps (optional):
- Add a visual schema or ERD
- Extend analytics or add DJ-focused fields (BPM, Key)
- Package this into a CLI tool or Python module

Thank you for exploring this project.
Feel free to fork, extend, or remix it for your own use cases! ðŸš€

