<a href="https://colab.research.google.com/github/npd00/anime_discovery_engine/blob/main/anime_discovery_engine.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Personal Anime Discovery Engine

**Objective:** Ingest Trakt.tv history, analyze viewing habits, and generate personalized anime recommendations using Google Gemini.

In [None]:
# Step 1: Configuration & Path Discovery
import os
import glob
import ipywidgets as widgets
from IPython.display import display

# Determine Environment (Colab vs Local)
PROJECT_DIR = '.'
try:
    from google.colab import drive
    from google.colab import userdata
    drive.mount('/content/drive')
    PROJECT_DIR = '/content/drive/MyDrive/Trakt_Project'
    print("‚úÖ Running in Google Colab")
except ImportError:
    PROJECT_DIR = os.getcwd()
    print(f"‚úÖ Running Locally at: {PROJECT_DIR}")

DB_PATH = os.path.join(PROJECT_DIR, 'trakt_data.duckdb')
MAX_CONTEXT_ITEMS = 50

# Ensure project directory exists
os.makedirs(PROJECT_DIR, exist_ok=True)
print(f"Project Directory: {PROJECT_DIR}")

# --- Interactive Configuration ---
ALL_SERVICES = ['NETFLIX', 'CRUNCHYROLL', 'HIDIVE', 'HULU', 'DISNEY+', 'PRIME VIDEO', 'HBO MAX']
service_checkboxes = [widgets.Checkbox(value=False, description=s) for s in ALL_SERVICES]
for i in [0, 1, 2]:
    if i < len(service_checkboxes): service_checkboxes[i].value = True
services_ui = widgets.GridBox(service_checkboxes, layout=widgets.Layout(grid_template_columns="repeat(3, 200px)"))

ALL_GENRES = ['Action', 'Adventure', 'Cyberpunk', 'Isekai', 'Psychological Thriller', 'Mecha', 'Slice of Life', 'Fantasy', 'Sci-Fi', 'Horror', 'Romance', 'Sports', 'Mystery']
genre_selector = widgets.SelectMultiple(options=ALL_GENRES, value=['Cyberpunk', 'Isekai'], rows=6, description='Genres:')

mood_selector = widgets.Dropdown(options=['Any', 'Chill', 'Dark', 'Hype', 'Emotional', 'Complex'], value='Any', description='Mood:')
discovery_mode = widgets.Dropdown(options=['Balanced', 'Safe Bets', 'Hidden Gems', 'Experimental'], value='Balanced', description='Mode:')
time_commitment = widgets.Dropdown(options=['Any', 'Movie', 'Short (<13)', 'Medium (24)', 'Long'], value='Any', description='Length:')

display(services_ui)
display(widgets.HBox([genre_selector, widgets.VBox([mood_selector, discovery_mode, time_commitment])]))

In [None]:
# Step 1.5: DEBUG PATHS
print("--- PATH DEBUGGER ---")
print(f"Base: {PROJECT_DIR}")
search_pattern = os.path.join(PROJECT_DIR, '**', 'watched-history-*.json')
found_files = glob.glob(search_pattern, recursive=True)
print(f"Files found via glob: {len(found_files)}")

In [None]:
# Step 2: Imports & Setup
import duckdb
import pandas as pd
import json
import re
import time
import requests
from datetime import datetime
import google.generativeai as genai
from IPython.display import display, clear_output

# Configure Gemini
try:
    try: GEMINI_API_KEY = userdata.get('GEMINI_API_KEY')
    except: GEMINI_API_KEY = os.environ.get('GEMINI_API_KEY')

    if GEMINI_API_KEY:
        genai.configure(api_key=GEMINI_API_KEY)
        print("Gemini API Configured Successfully.")
    else:
        print("‚ö†Ô∏è Gemini API Key not found. AI features disabled.")
except Exception as e:
    print(f"API Config Error: {e}")

In [None]:
# Step 3: Database Initialization (With Cache Tables)

def init_db():
    con = duckdb.connect(DB_PATH)
    con.execute("CREATE TABLE IF NOT EXISTS USER_WATCH_HISTORY (TRAKT_ID VARCHAR, TITLE VARCHAR, YEAR INTEGER, USER_RATING INTEGER, MEDIA_TYPE VARCHAR, GENRES VARCHAR, EPISODES_WATCHED INTEGER, TOTAL_EPISODES INTEGER, FIRST_WATCHED_AT TIMESTAMP, LAST_WATCHED_AT TIMESTAMP, WATCH_DURATION_DAYS INTEGER, BINGE_INDICATOR BOOLEAN, STUDIO VARCHAR, MAL_SCORE DOUBLE, VALID_FROM TIMESTAMP, VALID_TO TIMESTAMP, IS_CURRENT BOOLEAN, INGESTION_TIMESTAMP TIMESTAMP)")
    con.execute("CREATE TABLE IF NOT EXISTS PLAN_TO_WATCH (ID BIGINT PRIMARY KEY, TITLE VARCHAR, RECOMMENDED_BY_AI_AT TIMESTAMP, STREAMING_SERVICE VARCHAR, NOTES VARCHAR, PRIORITY VARCHAR, CONFIDENCE_SCORE DOUBLE)")
    con.execute("CREATE TABLE IF NOT EXISTS JIKAN_CACHE (TITLE VARCHAR PRIMARY KEY, MAL_ID INTEGER, DATA_JSON VARCHAR, FETCHED_AT TIMESTAMP)")
    con.execute("CREATE TABLE IF NOT EXISTS GENRE_CACHE (TITLE VARCHAR PRIMARY KEY, GENRES_JSON VARCHAR)")
    con.execute("CREATE SEQUENCE IF NOT EXISTS seq_plan_id START 1")
    con.close()
    print("Database initialized.")
init_db()

In [None]:
# Step 4: ETL Functions

def clean_title(title):
    if not title: return "UNKNOWN"
    return re.sub(r'[^A-Z0-9 \-!:&]', '', str(title).upper()).strip()

def enrich_with_jikan(items_list, con=None):
    if not items_list: return {}
    print(f"\nüîé Enriching {len(items_list)} titles via Jikan...")
    should_close = False
    if not con: con = duckdb.connect(DB_PATH); should_close = True

    # 1. Bulk Load Cache
    try: cached_rows = con.execute("SELECT TITLE, DATA_JSON FROM JIKAN_CACHE").fetchall()
    except: cached_rows = []
    cache_map = {r[0]: json.loads(r[1]) for r in cached_rows}

    # 2. Identify Delta
    results = {}
    titles_to_process = []
    for item in items_list:
        t = item['TITLE']
        if t in cache_map:
            results[t] = cache_map[t] # Use cached raw data momentarily
        else:
            titles_to_process.append(item)

    print(f"   - Cached: {len(results)} | New to Fetch: {len(titles_to_process)}")

    # 3. Fetch Missing
    for item in titles_to_process:
        title = item['TITLE']; year_trakt = item['YEAR']
        data = None
        try:
            url = f"https://api.jikan.moe/v4/anime?q={title}&limit=1"
            resp = requests.get(url)
            if resp.status_code == 200:
                raw = resp.json()
                if raw.get('data'):
                    cand = raw['data'][0]
                    # Year Logic
                    y_j = cand.get('year') or (cand.get('aired', {}).get('prop', {}).get('from', {}).get('year'))
                    match = True
                    if year_trakt and y_j and abs(int(year_trakt) - int(y_j)) > 1: match = False

                    if match:
                        data = cand
                        con.execute("INSERT INTO JIKAN_CACHE VALUES (?, ?, ?, ?)", (title, data.get('mal_id'), json.dumps(data), datetime.now()))
            time.sleep(1.05) # Rate limit
        except Exception as e: print(f"Err {title}: {e}")

        if data: results[title] = data

    # 4. Format Output
    final_map = {}
    for t, data in results.items():
        studio = "Unknown"
        if data.get('studios'): studio = data['studios'][0]['name']
        final_map[t] = {'MAL_SCORE': data.get('score'), 'STUDIO': studio, 'TOTAL_EPISODES': data.get('episodes')}

    if should_close: con.close()
    return final_map

def enrich_genres_with_gemini(titles_list, batch_size=20, con=None):
    if not titles_list: return {}
    print(f"\nüß† Enriching {len(titles_list)} genres via Gemini...")
    should_close = False
    if not con: con = duckdb.connect(DB_PATH); should_close = True

    # 1. Bulk Load Cache
    try: cached_rows = con.execute("SELECT TITLE, GENRES_JSON FROM GENRE_CACHE").fetchall()
    except: cached_rows = []
    cache_map = {r[0]: json.loads(r[1]) for r in cached_rows}

    # 2. Identify Delta
    missing_titles = [t for t in titles_list if t not in cache_map]
    print(f"   - Cached: {len(titles_list)-len(missing_titles)} | New to Gen: {len(missing_titles)}")

    # 3. Generate Missing
    try: model = genai.GenerativeModel('gemini-2.5-flash')
    except: return cache_map # Fallback

    for i in range(0, len(missing_titles), batch_size):
        batch = missing_titles[i:i+batch_size]
        try:
            prompt = f"Classify titles into 2-3 genres. Titles: {json.dumps(batch)} Output STRICT JSON: {{'Title': ['Genre1']}}"
            response = model.generate_content(prompt)
            text = response.text.replace('```json', '').replace('```', '')
            new_data = json.loads(text)

            # Update Cache & Map
            for t, g_list in new_data.items():
                cache_map[t] = g_list
                con.execute("INSERT OR REPLACE INTO GENRE_CACHE VALUES (?, ?)", (t, json.dumps(g_list)))
            time.sleep(1)
        except Exception as e: print(f"Gen Error: {e}")

    if should_close: con.close()
    return cache_map

def calculate_viewing_stats(df):
    stats_list = []
    grouped = df.groupby('TRAKT_ID')
    for trakt_id, group in grouped:
        group = group.sort_values('watched_at')
        stats_list.append({
            'TRAKT_ID': trakt_id,
            'EPISODES_WATCHED': len(group),
            'FIRST_WATCHED_AT': group['watched_at'].min(),
            'WATCH_DURATION_DAYS': (group['watched_at'].max() - group['watched_at'].min()).days,
            'BINGE_INDICATOR': len(group) > 3 and group.set_index('watched_at').resample('D').size().max() > 3
        })
    return pd.DataFrame(stats_list)

def load_trakt_files(pattern_list):
    all_data = []
    for pattern in pattern_list:
        search_path = os.path.join(PROJECT_DIR, '**', pattern)
        files = glob.glob(search_path, recursive=True)
        print(f"Searching '{pattern}'... Found {len(files)}")
        for f_path in files:
            try:
                with open(f_path, 'r') as f:
                    content = json.load(f)
                    if isinstance(content, list): all_data.extend(content)
            except Exception as e: print(f"Error loading {f_path}: {e}")
    return all_data

def run_etl_pipeline():
    print(f"üöÄ Starting ETL in {PROJECT_DIR}...")
    conn = duckdb.connect(DB_PATH)
    try:
        # 1. Load Data
        history_data = load_trakt_files(['watched-history-*.json', 'watched-movies.json'])
        ratings_data = load_trakt_files(['ratings-shows.json', 'ratings-movies.json'])
        if not history_data: return

        rating_map = {}
        for r in ratings_data:
            item = r.get('show') or r.get('movie')
            if item and r.get('rating'): rating_map[str(item['ids']['trakt'])] = r['rating']

        # 2. Process History
        raw_df = pd.DataFrame(history_data)
        if 'movie' in raw_df.columns:
             raw_df['title'] = raw_df.apply(lambda x: x['movie']['title'] if pd.notnull(x.get('movie')) else x['show']['title'], axis=1)
             raw_df['year'] = raw_df.apply(lambda x: x['movie']['year'] if pd.notnull(x.get('movie')) else x['show']['year'], axis=1)
             raw_df['id'] = raw_df.apply(lambda x: x['movie']['ids']['trakt'] if pd.notnull(x.get('movie')) else x['show']['ids']['trakt'], axis=1)
             raw_df['type'] = raw_df.apply(lambda x: 'movie' if pd.notnull(x.get('movie')) else 'show', axis=1)

        if 'watched_at' not in raw_df.columns and 'last_watched_at' in raw_df.columns:
            raw_df['watched_at'] = raw_df['last_watched_at']

        raw_prep = raw_df.rename(columns={'id': 'TRAKT_ID', 'title': 'TITLE', 'year': 'YEAR', 'type': 'MEDIA_TYPE'})
        raw_prep['TRAKT_ID'] = raw_prep['TRAKT_ID'].astype(str)
        raw_prep['watched_at'] = pd.to_datetime(raw_prep['watched_at'])
        raw_prep['TITLE'] = raw_prep['TITLE'].apply(clean_title)
        raw_prep['USER_RATING'] = raw_prep['TRAKT_ID'].apply(lambda x: rating_map.get(x, None))

        stats_df = calculate_viewing_stats(raw_prep)
        staging_base = raw_prep.sort_values('watched_at', ascending=False).drop_duplicates('TRAKT_ID')
        staging_df = staging_base.merge(stats_df, on='TRAKT_ID', how='left')
        staging_df = staging_df.rename(columns={'watched_at': 'LAST_WATCHED_AT'})

        # 3. Optimized Enrichments
        unique_items = staging_df[['TITLE', 'YEAR']].drop_duplicates().to_dict('records')
        # Pass 'conn' to reuse connection and access cache tables
        jikan_map = enrich_with_jikan(unique_items, con=conn)

        unique_titles = [i['TITLE'] for i in unique_items]
        genre_map = enrich_genres_with_gemini(unique_titles, con=conn)

        staging_df['GENRES'] = staging_df['TITLE'].apply(lambda t: json.dumps(genre_map.get(t, [])))
        staging_df['MAL_SCORE'] = staging_df['TITLE'].apply(lambda t: jikan_map.get(t, {}).get('MAL_SCORE'))
        staging_df['STUDIO'] = staging_df['TITLE'].apply(lambda t: jikan_map.get(t, {}).get('STUDIO'))
        staging_df['TOTAL_EPISODES'] = staging_df['TITLE'].apply(lambda t: jikan_map.get(t, {}).get('TOTAL_EPISODES'))

        # 4. Upsert (SCD2)
        cols = ['TRAKT_ID', 'TITLE', 'USER_RATING', 'MEDIA_TYPE', 'LAST_WATCHED_AT', 'EPISODES_WATCHED', 'FIRST_WATCHED_AT', 'WATCH_DURATION_DAYS', 'BINGE_INDICATOR', 'GENRES', 'MAL_SCORE', 'STUDIO', 'TOTAL_EPISODES', 'YEAR']
        for c in cols:
             if c not in staging_df.columns: staging_df[c] = None

        current_db_df = conn.query("SELECT * FROM USER_WATCH_HISTORY WHERE IS_CURRENT = TRUE").to_df()
        if not current_db_df.empty: current_db_df = current_db_df.astype({'TRAKT_ID': str})

        merged = staging_df.merge(current_db_df, on='TRAKT_ID', how='left', suffixes=('', '_OLD'))
        new_records = merged[merged['TITLE_OLD'].isna()].copy()
        changed_records = merged[(merged['TITLE_OLD'].notna()) & ((merged['USER_RATING'] != merged['USER_RATING_OLD']) | (merged['EPISODES_WATCHED'] != merged['EPISODES_WATCHED']))].copy()

        now = datetime.now()
        if not changed_records.empty:
            ids = tuple(changed_records['TRAKT_ID'].tolist())
            if len(ids) == 1: ids = f"('{ids[0]}')"
            conn.execute(f"UPDATE USER_WATCH_HISTORY SET VALID_TO=?, IS_CURRENT=FALSE WHERE TRAKT_ID IN {ids} AND IS_CURRENT=TRUE", [now])

        to_insert = pd.concat([new_records, changed_records])
        if not to_insert.empty:
            final_df = to_insert[cols].copy()
            final_df['VALID_FROM'] = now; final_df['VALID_TO'] = None; final_df['IS_CURRENT'] = True; final_df['INGESTION_TIMESTAMP'] = now
            c_order = ['TRAKT_ID', 'TITLE', 'YEAR', 'USER_RATING', 'MEDIA_TYPE', 'GENRES', 'EPISODES_WATCHED', 'TOTAL_EPISODES', 'FIRST_WATCHED_AT', 'LAST_WATCHED_AT', 'WATCH_DURATION_DAYS', 'BINGE_INDICATOR', 'STUDIO', 'MAL_SCORE', 'VALID_FROM', 'VALID_TO', 'IS_CURRENT', 'INGESTION_TIMESTAMP']
            final_df = final_df.reindex(columns=c_order)
            conn.execute("INSERT INTO USER_WATCH_HISTORY SELECT * FROM final_df")
            print(f"Inserted {len(final_df)} new/updated records.")
        else: print("No new changes detected.")
    except Exception as e: print(f"ETL Error: {e}")
    finally: conn.close()

run_etl_pipeline()

In [None]:
# Step 5: DATA VERIFICATION
print("--- DATABASE AUDIT ---")
con = duckdb.connect(DB_PATH)
try:
    count = con.execute("SELECT COUNT(*) FROM USER_WATCH_HISTORY WHERE IS_CURRENT=TRUE").fetchone()[0]
    print(f"üé• Active Watch History Records: {count}")
    j_count = con.execute("SELECT COUNT(*) FROM JIKAN_CACHE").fetchone()[0]
    g_count = con.execute("SELECT COUNT(*) FROM GENRE_CACHE").fetchone()[0]
    print(f"üíæ Cache Stats: Jikan={j_count} | Genres={g_count}")

    if count > 0:
        print("\nSnapshot (Top 5 Recent):")
        df = con.query("SELECT TITLE, YEAR, USER_RATING, EPISODES_WATCHED, LAST_WATCHED_AT FROM USER_WATCH_HISTORY WHERE IS_CURRENT=TRUE ORDER BY LAST_WATCHED_AT DESC LIMIT 5").to_df()
        display(df)
    else:
        print("‚ö†Ô∏è Table is empty! ETL failed.")
except Exception as e: print(f"Audit Error: {e}")
finally: con.close()

In [None]:
# Step 6: AI Engine
def get_ai_recommendations():
    con = duckdb.connect(DB_PATH)
    try: context_df = con.query("SELECT TITLE, USER_RATING, BINGE_INDICATOR, STUDIO FROM USER_WATCH_HISTORY WHERE IS_CURRENT = TRUE AND USER_RATING >= 8 ORDER BY BINGE_INDICATOR DESC, LAST_WATCHED_AT DESC LIMIT 50").to_df()
    except: return []
    finally: con.close()

    favorites = context_df['TITLE'].tolist()
    studios = context_df['STUDIO'].dropna().unique().tolist()[:3]

    selected_services = [chk.description for chk in service_checkboxes if chk.value]
    selected_genres = list(genre_selector.value)
    current_mood = mood_selector.value
    mode = discovery_mode.value
    length_pref = time_commitment.value

    if not selected_services or not selected_genres:
        print("‚ö†Ô∏è Configure preferences above.")
        return []

    prompt = f"""
    Role: Anime Recommender AI.
    Output STRICT JSON: [{{'title': '...', 'reasoning': '...', 'service': '...', 'ai_confidence': 0.95}}]

    User Profile:
    - Favorites: {', '.join(favorites)}
    - Studios: {', '.join(studios)}
    - Services: {', '.join(selected_services)}
    - Genres: {', '.join(selected_genres)}

    Context:
    - Mood: {current_mood}
    - Length: {length_pref}
    - Mode: {mode}
    """

    print("Asking Gemini...")
    try:
        model = genai.GenerativeModel('gemini-2.5-flash')
        response = model.generate_content(prompt)
        text = response.text.replace('```json', '').replace('```', '')
        return json.loads(text)
    except Exception as e:
        print(f"AI Error: {e}")
        return []

In [None]:
# Step 7: UI
def save_to_watchlist(title, service, notes, priority, confidence=None):
    con = duckdb.connect(DB_PATH)
    try:
        id = con.execute("SELECT nextval('seq_plan_id')").fetchone()[0]
        con.execute("INSERT INTO PLAN_TO_WATCH (ID, TITLE, RECOMMENDED_BY_AI_AT, STREAMING_SERVICE, NOTES, PRIORITY, CONFIDENCE_SCORE) VALUES (?, ?, ?, ?, ?, ?, ?)", (id, title, datetime.now(), service, notes, priority, confidence))
        print(f"\n[Saved] {title} ({priority} Priority)")
    except Exception as e: print(f"Error saving: {e}")
    finally: con.close()

def display_recommendations():
    recs = get_ai_recommendations()
    if not recs: return
    rows = []
    for i, item in enumerate(recs):
        t, s, r, c = item.get('title'), item.get('service'), item.get('reasoning'), item.get('ai_confidence', 0.85)
        l1 = widgets.HTML(f"<b>{i+1}. {t}</b> <span style='color:gray'>({s})</span>")
        l2 = widgets.Label(f"{r}")
        p_dropdown = widgets.Dropdown(options=['High', 'Medium', 'Low'], value='Medium', description='Priority:', layout=widgets.Layout(width='200px'))
        btn = widgets.Button(description='Save', icon='plus', button_style='success')
        def on_click(b, t=t, s=s, r=r, c=c, p=p_dropdown):
            save_to_watchlist(t, s, r, p.value, c)
            b.icon = 'check'; b.description = 'Saved'; b.disabled = True
        btn.on_click(on_click)
        rows.append(widgets.VBox([l1, l2, widgets.HBox([p_dropdown, btn]), widgets.HTML("<hr>")]))
    display(widgets.VBox(rows))

display_recommendations()