In [1]:
import os, json, random, base64
from pathlib import Path
import pandas as pd
from mutagen import File as MF
import streamlit as st


In [6]:
from pathlib import Path
ROOT = Path.cwd()  # this defines the working directory
ASSETS = ROOT / "assets"
ASSETS.mkdir(exist_ok=True)


In [8]:
DB_PATH = ROOT / "music.db"


In [10]:
# --- Setup base paths ---
from pathlib import Path
ROOT = Path.cwd()
ASSETS = ROOT / "assets"
ASSETS.mkdir(exist_ok=True)

# --- Supported extensions ---
EXTS = [".mp3", ".wav", ".flac", ".ogg"]


In [12]:
from pathlib import Path
ROOT = Path.cwd()
ASSETS = ROOT / "assets"; ASSETS.mkdir(exist_ok=True)
EXTS = [".mp3", ".wav", ".flac", ".ogg", ".m4a", ".aac"]


In [14]:
# Core imports
import sqlite3, time, json
from pathlib import Path

# Safe metadata reader (mutagen is optional)
try:
    from mutagen import File as MF
    HAVE_MUTAGEN = True
except Exception:
    HAVE_MUTAGEN = False


In [16]:
# Database path
DB_PATH = ROOT / "music.db"

def get_conn():
    con = sqlite3.connect(DB_PATH)
    con.row_factory = sqlite3.Row
    return con

def init_db():
    with get_conn() as con:
        con.executescript("""
        PRAGMA journal_mode=WAL;

        CREATE TABLE IF NOT EXISTS tracks (
            id INTEGER PRIMARY KEY,
            path TEXT UNIQUE NOT NULL,
            title TEXT,
            artist TEXT,
            album TEXT,
            duration REAL DEFAULT 0,
            added_at INTEGER
        );

        CREATE TABLE IF NOT EXISTS playlists (
            id INTEGER PRIMARY KEY,
            name TEXT UNIQUE NOT NULL,
            created_at INTEGER
        );

        CREATE TABLE IF NOT EXISTS playlist_items (
            playlist_id INTEGER,
            track_id INTEGER,
            position INTEGER,
            PRIMARY KEY (playlist_id, track_id),
            FOREIGN KEY (playlist_id) REFERENCES playlists(id) ON DELETE CASCADE,
            FOREIGN KEY (track_id) REFERENCES tracks(id) ON DELETE CASCADE
        );

        CREATE INDEX IF NOT EXISTS idx_tracks_text ON tracks(title,artist,album);
        """)
    return True

def upsert_track(row):
    with get_conn() as con:
        con.execute(
            """
            INSERT INTO tracks(path,title,artist,album,duration,added_at)
            VALUES(?,?,?,?,?,?)
            ON CONFLICT(path) DO UPDATE SET
                title=excluded.title,
                artist=excluded.artist,
                album=excluded.album,
                duration=excluded.duration
            """,
            (
                row["path"],
                row.get("title")  or "Unknown",
                row.get("artist") or "Unknown",
                row.get("album")  or "Unknown",
                float(row.get("duration", 0) or 0),
                int(time.time()),
            ),
        )

def ensure_playlist(name="All"):
    with get_conn() as con:
        cur = con.execute("SELECT id FROM playlists WHERE name=?", (name,))
        r = cur.fetchone()
        if r: 
            return r["id"]
        cur = con.execute(
            "INSERT INTO playlists(name,created_at) VALUES(?,?)",
            (name, int(time.time())),
        )
        return cur.lastrowid

def add_to_playlist(track_path, playlist_name="All"):
    with get_conn() as con:
        pl_id = ensure_playlist(playlist_name)
        t = con.execute("SELECT id FROM tracks WHERE path=?", (track_path,)).fetchone()
        if not t:
            return
        pos = con.execute(
            "SELECT COALESCE(MAX(position),0)+1 FROM playlist_items WHERE playlist_id=?",
            (pl_id,),
        ).fetchone()[0]
        con.execute(
            "INSERT OR REPLACE INTO playlist_items(playlist_id,track_id,position) VALUES(?,?,?)",
            (pl_id, t["id"], pos),
        )

def count_tracks():
    with get_conn() as con:
        return con.execute("SELECT COUNT(*) FROM tracks").fetchone()[0]


In [18]:
def read_tags(path: Path):
    """Return dict with title/artist/album/duration using mutagen if available."""
    title = path.stem
    artist = "Unknown"
    album = "Unknown"
    duration = 0.0

    if HAVE_MUTAGEN:
        try:
            mf = MF(path, easy=True)
            if mf is not None:
                title   = (mf.get("title",  [title])   or [title])[0]
                artist  = (mf.get("artist", ["Unknown"]) or ["Unknown"])[0]
                album   = (mf.get("album",  ["Unknown"]) or ["Unknown"])[0]
                info    = getattr(mf, "info", None)
                if info and getattr(info, "length", None) is not None:
                    duration = float(info.length)
        except Exception:
            pass

    return {
        "path": str(path),
        "title": str(title),
        "artist": str(artist),
        "album": str(album),
        "duration": float(duration),
    }

def scan_assets_to_db():
    """Walk ./assets and upsert everything into SQLite."""
    added = 0
    for p in ASSETS.rglob("*"):
        if p.suffix.lower() in (ext.lower() for ext in EXTS):
            row = read_tags(p)
            upsert_track(row)
            added += 1
    return added


In [20]:
def query_tracks(q: str = "", limit: int = 5000):
    """Return list of dict rows."""
    q = (q or "").strip()
    with get_conn() as con:
        if not q:
            cur = con.execute(
                "SELECT * FROM tracks ORDER BY title COLLATE NOCASE LIMIT ?",
                (limit,),
            )
        else:
            like = f"%{q}%"
            cur = con.execute(
                """
                SELECT * FROM tracks
                WHERE title  LIKE ? OR artist LIKE ? OR album LIKE ?
                ORDER BY title COLLATE NOCASE LIMIT ?
                """,
                (like, like, like, limit),
            )
        return [dict(r) for r in cur.fetchall()]


In [22]:
init_db()
added = scan_assets_to_db()  # scans ./assets
print("Indexed files:", added, "| Total in DB:", count_tracks())


Indexed files: 0 | Total in DB: 0


In [24]:
import pandas as pd

def search_df(q=""):
    rows = query_tracks(q)
    df = pd.DataFrame(rows, columns=["title","artist","album","duration","path"])
    # pretty duration
    def mmss(x):
        try:
            s = int(x)
            return f"{s//60:02d}:{s%60:02d}"
        except Exception:
            return "00:00"
    if not df.empty:
        df["duration"] = df["duration"].map(mmss)
    return df

# Try a search (change the query text)
display(search_df("love"))


Unnamed: 0,title,artist,album,duration,path


In [26]:
def import_library_json_once():
    j = ROOT / "library.json"
    if not j.exists():
        print("No library.json to import.")
        return
    try:
        data = json.loads(j.read_text(encoding="utf-8"))
        for row in data:
            upsert_track({
                "path": row.get("path",""),
                "title": row.get("title","Unknown"),
                "artist": row.get("artist","Unknown"),
                "album": row.get("album","Unknown"),
                "duration": float(row.get("duration",0) or 0),
            })
        print("Imported", len(data), "items from library.json")
    except Exception as e:
        print("Import skipped:", e)

# Run once if you have an old JSON library you want to migrate:
# import_library_json_once()
# print("Total in DB:", count_tracks())


In [28]:
def add_many_to_playlist(paths, name="Favorites"):
    for p in paths:
        add_to_playlist(p, name)
    print(f"Added {len(paths)} items to playlist '{name}'.")

# Example:
# rows = query_tracks("ambient")
# add_many_to_playlist([r["path"] for r in rows], "Ambient")
