### analysis pipeline

In [None]:
import duckdb
import os
import shutil
import signal
import sys
import hashlib


con = None

def get_file_hash(filepath):
    """Generates a SHA-256 hash of the ENTIRE file for data integrity."""
    hasher = hashlib.sha256()
    with open(filepath, 'rb') as f:
        while chunk := f.read(65536):
            hasher.update(chunk)
    return hasher.hexdigest()

def signal_handler(sig, frame):
    global con
    print("\n[!] Interrupt received. Closing DuckDB safely...")
    if con: con.close()
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

def run_pipeline(input_dir, db_path="scalable.db", bronze_dir="../data/outputs/bronze_listens", gold_dir="../data/outputs/gold_exports", reset=False):
    global con
    if reset:
        print("[!] Resetting environment...")
        for d in [bronze_dir, gold_dir]:
            if os.path.exists(d): shutil.rmtree(d)
        if os.path.exists(db_path): os.remove(db_path)
    
    os.makedirs(bronze_dir, exist_ok=True)
    os.makedirs(gold_dir, exist_ok=True)
    con = duckdb.connect(db_path)

    try:

        con.execute("SET memory_limit = '4GB'; SET threads = 4;")


        con.execute("CREATE TABLE IF NOT EXISTS processed_files (content_hash VARCHAR PRIMARY KEY, filename VARCHAR, processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)")
        processed_hashes = set(row[0] for row in con.execute("SELECT content_hash FROM processed_files").fetchall())
        
        all_files = [f for f in os.listdir(input_dir) if f.lower().endswith('.txt')]
        for file_name in all_files:
            full_path = os.path.join(input_dir, file_name)
            file_hash = get_file_hash(full_path)
            if file_hash in processed_hashes: continue 

            print(f"-> Bronze: Ingesting {file_name}")
            con.execute(f"""
                COPY (SELECT *, (track_metadata->>'track_name') as track_name, (track_metadata->>'artist_name') as artist_name
                FROM read_json('{full_path}', format='newline_delimited',
                columns={{'user_name': 'VARCHAR', 'listened_at': 'BIGINT', 'recording_msid': 'VARCHAR', 'track_metadata': 'JSON'}}))
                TO '{bronze_dir}' (FORMAT 'PARQUET', PARTITION_BY (user_name), OVERWRITE_OR_IGNORE 1)
            """)
            con.execute("INSERT INTO processed_files (content_hash, filename) VALUES (?, ?)", [file_hash, file_name])

        con.execute("""
            CREATE TABLE IF NOT EXISTS silver_listens (
                user_name VARCHAR, listened_at BIGINT, recording_msid VARCHAR,
                artist_name VARCHAR, track_name VARCHAR, listened_date DATE,
                UNIQUE(user_name, listened_at)
            )
        """)

        if any(os.scandir(bronze_dir)):
            print("-> Silver: Syncing and Deduplicating (Seconds Scale)...")
            con.execute(f"""
                INSERT INTO silver_listens (user_name, listened_at, recording_msid, artist_name, track_name, listened_date)
                SELECT user_name, listened_at, recording_msid, artist_name, track_name, to_timestamp(listened_at)::DATE
                FROM read_parquet('{bronze_dir}/**/*.parquet') ON CONFLICT (user_name, listened_at) DO NOTHING
            """)

        # --- JOB 3: GOLD (Analysis Views) ---
        print("-> Gold: Refreshing Views and Exports...")
        
        # View 1: Top 10 Users
        con.execute("CREATE OR REPLACE VIEW gold_top_10 AS SELECT user_name, COUNT(*) as cnt FROM silver_listens GROUP BY 1 ORDER BY 2 DESC LIMIT 10")
        
        # View 2: Top 3 Days per User
        con.execute("""
            CREATE OR REPLACE VIEW gold_user_top_days AS
            WITH daily AS (SELECT user_name, listened_date, COUNT(*) as cnt FROM silver_listens GROUP BY 1, 2)
            SELECT user_name as user, cnt as number_of_listens, listened_date as date
            FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY user_name ORDER BY cnt DESC) as r FROM daily)
            WHERE r <= 3 ORDER BY user ASC, number_of_listens DESC
        """)
        
        # Physical Parquet Export for BI
        con.execute(f"COPY gold_user_top_days TO '{gold_dir}/user_peaks.parquet' (FORMAT 'PARQUET')")

        # --- AUDIT ---
        s_cnt = con.execute("SELECT COUNT(*) FROM silver_listens").fetchone()[0]
        v_date = con.execute("SELECT COUNT(*) FROM silver_listens WHERE listened_date >= '2019-01-01'").fetchone()[0]
        print("-" * 30)
        print(f"Audit Result: {s_cnt} total records.")
        print(f"Valid 2019+ Dates: {v_date}")
        print(f"Gold Export ready at: {gold_dir}/user_peaks.parquet")
        print("-" * 30)

    finally:
        if con: con.close()

if __name__ == "__main__":
    run_pipeline("../data", reset=True)
    
    # CORRECT WAY: Use duckdb.connect() to open the database, then query
    print("\n" + "=" * 60)
    print("TASK #2 QUERIES - Using duckdb.query()")
    print("=" * 60)
    
    # Open a connection to the database
    db = duckdb.connect("scalable.db", read_only=True)
    
    # A1: Top 10 users
    print("\nA1. Top 10 users by songs listened to:")
    duckdb.query("SELECT user_name, COUNT(*) AS listen_count FROM silver_listens GROUP BY 1 ORDER BY 2 DESC LIMIT 10", connection=db).show()
    
    # A2: Users on March 1st, 2019
    print("\nA2. Users on 2019-03-01:")
    duckdb.query("SELECT COUNT(DISTINCT user_name) as user_count FROM silver_listens WHERE listened_date = '2019-03-01'", connection=db).show()
    
    # A3: First song per user
    print("\nA3. First song listened to by each user:")
    duckdb.query("""
        SELECT user_name, track_name, artist_name, listened_date
        FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY user_name ORDER BY listened_at ASC) as rn FROM silver_listens)
        WHERE rn = 1 ORDER BY user_name
    """, connection=db).show()

    # A4: Daily Active Users (7-day rolling window)
    print("\nA4. Daily Active Users (7-day window):")
    duckdb.query("""
        WITH daily_users AS (
            SELECT listened_date, user_name 
            FROM silver_listens 
            GROUP BY 1, 2
        ),
        all_dates AS (
            SELECT DISTINCT listened_date FROM silver_listens
        ),
        active_counts AS (
            SELECT 
                curr.listened_date,
                COUNT(DISTINCT past.user_name) as number_active_users
            FROM all_dates curr
            LEFT JOIN daily_users past 
            ON past.listened_date BETWEEN (curr.listened_date - INTERVAL 6 DAYS) AND curr.listened_date
            GROUP BY curr.listened_date
        ),
        total_users AS (
            SELECT COUNT(DISTINCT user_name) as total FROM silver_listens
        )
        SELECT 
            listened_date as date,
            number_active_users,
            round((number_active_users::FLOAT / (SELECT total FROM total_users)) * 100, 2) as percentage_active_users
        FROM active_counts
        ORDER BY date ASC
    """, connection=db).show()
    
    db.close()

[!] Resetting environment...
-> Gold: Refreshing Views and Exports...
------------------------------
Audit Result: 0 total records.
Valid 2019+ Dates: 0
Gold Export ready at: ../data/outputs/gold_exports/user_peaks.parquet
------------------------------

TASK #2 QUERIES - Using duckdb.query()

A1. Top 10 users by songs listened to:
┌───────────┬──────────────┐
│ user_name │ listen_count │
│  varchar  │    int64     │
├───────────┴──────────────┤
│          0 rows          │
└──────────────────────────┘


A2. Users on 2019-03-01:
┌────────────┐
│ user_count │
│   int64    │
├────────────┤
│          0 │
└────────────┘


A3. First song listened to by each user:
┌───────────┬────────────┬─────────────┬───────────────┐
│ user_name │ track_name │ artist_name │ listened_date │
│  varchar  │  varchar   │   varchar   │     date      │
├───────────┴────────────┴─────────────┴───────────────┤
│                        0 rows                        │
└──────────────────────────────────────────────