<a href="https://colab.research.google.com/github/thetoph/YotoTools/blob/main/YotoPlaylistIconGen.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
# @title ü¶Å Yoto Remaster (Smart Prompt Edition)
# @markdown **Upgrade:** Uses `gemini-1.5-flash` to write intelligent, descriptive prompts for each track.
# @markdown **Model:** Uses `imagen-3.0-fast-generate-001` for speed/cost (~$0.02/img).

import os
import time
import requests
import json
import concurrent.futures
from io import BytesIO
from PIL import Image, ImageEnhance

# --- ‚òÅÔ∏è GOOGLE CLOUD CONFIG ---
PROJECT_ID = "YOUR_PROJECT_ID_HERE" # @param {type:"string"}
LOCATION = "us-central1"            # @param {type:"string"}

# --- üîê SECRETS ---
try:
    from google.colab import userdata
    YOTO_CLIENT_ID = userdata.get('YOTO_CLIENT_ID')
    try:
        PROJECT_ID = userdata.get('GCP_PROJECT_ID') or PROJECT_ID
    except: pass
except ImportError:
    YOTO_CLIENT_ID = "YOUR_YOTO_ID_HERE"

# --- üì¶ INSTALLATION ---
print("üì¶ Installing Vertex AI & Tools...")
!pip install -q -U google-cloud-aiplatform rembg[gpu] onnxruntime

import vertexai
from vertexai.preview.vision_models import ImageGenerationModel
from vertexai.generative_models import GenerativeModel
from google.colab import auth
from rembg import remove

# --- üîë AUTHENTICATION ---
print("\nüîê Authenticating with Google Cloud...")
auth.authenticate_user()
vertexai.init(project=PROJECT_ID, location=LOCATION)

print("üß† Loading Models...")
try:
    # 1. The "Brain" - Writes the creative prompts
    # gemini-1.5-flash is fast, cheap, and very smart at understanding context
    text_model = GenerativeModel("gemini-1.5-flash-001")

    # 2. The "Painter" - Creates the pixels
    image_model = ImageGenerationModel.from_pretrained("imagen-3.0-fast-generate-001")
    print("   ‚úÖ Models Loaded Successfully")
except Exception as e:
    print(f"‚ùå Error loading models: {e}")

# --- üéõÔ∏è CONFIG ---
MAX_WORKERS = 1
MAX_RETRIES = 3
COOLDOWN_SECONDS = 4     # Fast model is quick
SKIP_EXISTING = True

# --- üîê AUTHENTICATION (YOTO) ---
def authenticate_yoto(client_id):
    print("\nüîÑ Yoto Auth: Requesting Device Code...")
    auth_url = "https://login.yotoplay.com/oauth/device/code"

    payload = {
        "client_id": client_id,
        "scope": "offline_access profile openid manage_content upload_icons",
        "audience": "https://api.yotoplay.com"
    }

    try:
        res = requests.post(auth_url, data=payload)
        res.raise_for_status()
        data = res.json()
    except Exception as e:
        print(f"‚ùå Yoto Auth Failed: {e}")
        return None

    print("\n" + "="*60)
    print(f"üöÄ CLICK TO APPROVE YOTO: {data['verification_uri_complete']}")
    print("="*60)

    while True:
        time.sleep(data.get('interval', 5))
        token_res = requests.post("https://login.yotoplay.com/oauth/token", data={
            "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
            "device_code": data['device_code'],
            "client_id": client_id
        })
        if token_res.status_code == 200:
            print("‚úÖ Yoto Authenticated!")
            return token_res.json()['access_token']

# --- üìÇ HELPERS ---
def get_user_custom_ids(token):
    if not SKIP_EXISTING: return set()
    url = "https://api.yotoplay.com/media/displayIcons/user/me"
    try:
        res = requests.get(url, headers={'Authorization': f'Bearer {token}'})
        if res.status_code == 200:
            data = res.json()
            ids = set()
            for icon in data.get('displayIcons', []):
                mid = icon.get('mediaId') or icon.get('id')
                if mid: ids.add(mid)
            return ids
    except: pass
    return set()

# --- üß† SMART PROMPTING ---
def generate_smart_prompt(title):
    if not title: return "pixel art icon of a cute toy, white background"

    # This system instruction teaches the model how to write for Imagen
    instruction = (
        f"You are an expert prompt engineer for pixel art icons. "
        f"Based on the track title: '{title}', write a descriptive image prompt.\n"
        "RULES:\n"
        "1. Identify the main concrete subject (animal, object, character).\n"
        "2. If the title is abstract (e.g. 'The Argument'), choose a visual metaphor (e.g. 'two angry speech bubbles').\n"
        "3. Output format: 'pixel art icon of [SUBJECT], [ACTION/DETAIL], [COLOR], white background, isolated, 8-bit style'.\n"
        "4. Keep it under 20 words.\n"
        "5. Return ONLY the prompt string."
    )

    try:
        res = text_model.generate_content(instruction)
        prompt = res.text.strip().replace('"', '').replace('\n', '')
        # Fallback if model chats too much
        if len(prompt) > 200: return f"pixel art icon of {title}, white background"
        return prompt
    except:
        return f"pixel art icon of {title}, white background"

# --- üé® GENERATION ---
def generate_icon(prompt_str):
    for attempt in range(MAX_RETRIES):
        try:
            if attempt > 0: time.sleep(10)
            else: time.sleep(COOLDOWN_SECONDS)

            # We add standard technical keywords to the smart prompt
            final_prompt = f"{prompt_str}, centered, distinct silhouette, vibrant colors, clean lines, no text"

            response = image_model.generate_images(
                prompt=final_prompt,
                number_of_images=1,
                aspect_ratio="1:1",
                person_generation="allow_adult",
            )

            if response.images:
                img_bytes = response.images[0]._image_bytes
                image = Image.open(BytesIO(img_bytes))

                # Post-process
                image = remove(image)
                bbox = image.getbbox()
                if bbox: image = image.crop(bbox)

                image_16 = image.resize((16, 16), resample=Image.Resampling.NEAREST)
                enhancer = ImageEnhance.Color(image_16)
                image_16 = enhancer.enhance(1.5)

                if image_16.mode != 'RGBA': image_16 = image_16.convert('RGBA')

                output = BytesIO()
                image_16.save(output, format='PNG')
                output.seek(0)
                return output

        except Exception as e:
            if "429" in str(e) or "quota" in str(e).lower():
                print(f"      üõë Quota Limit. Waiting 30s...")
                time.sleep(30)
            elif "blocked" in str(e).lower():
                print(f"      üõ°Ô∏è Safety Block on prompt")
                return None
            else:
                print(f"      ‚ö†Ô∏è Error: {e}")

    return None

def upload_to_yoto(token, img_buffer):
    url = "https://api.yotoplay.com/media/displayIcons/user/me/upload"
    img_buffer.seek(0)
    headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'image/png'}
    try:
        res = requests.post(url, headers=headers, data=img_buffer.read(), params={'autoConvert': 'true'})
        if res.status_code in [200, 201]:
            data = res.json()
            return data.get('displayIcon', {}).get('mediaId') or data.get('id')
    except: pass
    return None

# --- üöÄ MAIN LOOP ---
def main():
    if PROJECT_ID == "YOUR_PROJECT_ID_HERE":
        print("‚ùå Please enter your Google Cloud PROJECT_ID at the top.")
        return

    token = authenticate_yoto(YOTO_CLIENT_ID)
    if not token: return

    user_custom_ids = get_user_custom_ids(token)

    print("\nüìö Fetching Library...")
    lib = requests.get("https://api.yotoplay.com/content/mine", headers={'Authorization': f'Bearer {token}'}).json()
    cards = lib.get('cards', [])

    if not cards:
        print("‚ùå No playlists found.")
        return

    for i, c in enumerate(cards):
        print(f"{i+1:<3} | {c.get('title', 'Untitled')[:40]}")

    sel = int(input("\nEnter #: ")) - 1
    selected = cards[sel]
    card_id = selected.get('cardId') or selected.get('id')

    print(f"\nüì• Loading: {selected.get('title')}")
    full_data = requests.get(f"https://api.yotoplay.com/content/{card_id}", headers={'Authorization': f'Bearer {token}'}).json()

    payload = full_data.get('card', full_data)
    chapters = payload.get('content', {}).get('chapters') or payload.get('items') or payload.get('chapters')

    if not chapters:
        print("‚ùå No chapters found.")
        return

    print(f"üî® Processing {len(chapters)} tracks...")

    updates = 0
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as exe:
        futures = {}
        for i, chap in enumerate(chapters):
            futures[exe.submit(process_single_track_wrapper, i, chap, token, user_custom_ids)] = i

        for future in concurrent.futures.as_completed(futures):
            idx, media_id = future.result()
            if media_id:
                updates += 1
                new_icon = f"yoto:#{media_id}"

                chap = chapters[idx]
                if 'display' not in chap: chap['display'] = {}
                chap['display']['icon16x16'] = new_icon
                if 'tracks' in chap:
                    for t in chap['tracks']:
                        if 'display' not in t: t['display'] = {}
                        t['display']['icon16x16'] = new_icon

    if updates > 0:
        print(f"\nüì° Uploading {updates} icons...")
        res = requests.post("https://api.yotoplay.com/content", headers={'Authorization': f'Bearer {token}'}, json=payload)
        if res.status_code == 200:
            print("‚ú® Success! Remaster complete.")
        else:
            print(f"‚ùå Update failed: {res.text}")
    else:
        print("\n‚ú® No changes needed.")

def process_single_track_wrapper(idx, chapter, token, custom_ids):
    title = chapter.get('title', 'Track')

    curr = chapter.get('display', {}).get('icon16x16', '')
    if SKIP_EXISTING and curr.startswith('yoto:#') and curr.replace('yoto:#', '') in custom_ids:
        print(f"   ‚è≠Ô∏è Skipped: {title}")
        return idx, None

    # Step 1: Use the "Smart Brain" to write the prompt
    smart_prompt = generate_smart_prompt(title)

    # Step 2: Use the "Painter" to create the icon
    icon_data = generate_icon(smart_prompt)

    if icon_data:
        print(f"   ‚úÖ Generated: {title}")
        # print(f"      (Prompt: {smart_prompt})") # Uncomment to see what it wrote
        mid = upload_to_yoto(token, icon_data)
        return idx, mid
    else:
        print(f"   ‚ùå Failed: {title}")
        return idx, None

if __name__ == "__main__":
    main()

üì¶ Installing Vertex AI & Tools...

üîê Authenticating with Google Cloud...
üß† Loading Models...
   ‚úÖ Models Loaded Successfully

üîÑ Yoto Auth: Requesting Device Code...

üöÄ CLICK TO APPROVE YOTO: https://login.yotoplay.com/activate?user_code=QXBF-TQNM
‚úÖ Yoto Authenticated!

üìö Fetching Library...
1   | Christmas Time üéÑ
2   | Paw Patrol
3   | Animal Sounds 
4   | Test icon generator 
5   | Family Voices

Enter #: 3

üì• Loading: Animal Sounds 
üî® Processing 99 tracks...
   ‚è≠Ô∏è Skipped: 01 Cat, Meowing, Excited Tom Cat, Animal
   ‚è≠Ô∏è Skipped: 02 Angry Cats Fighting
   ‚è≠Ô∏è Skipped: 03 Cats, Domestic Meowing
   ‚è≠Ô∏è Skipped: 04 Mog Cat Meowing
   ‚è≠Ô∏è Skipped: 05 Ten Week Old Kitten, Meowing
   ‚è≠Ô∏è Skipped: 06 Tom Cat, Meowing
   ‚è≠Ô∏è Skipped: 07 Persian Cat Meow
   ‚è≠Ô∏è Skipped: 08 Lion Roar, Throaty
   ‚è≠Ô∏è Skipped: 09 Tiger Roar
   ‚è≠Ô∏è Skipped: 11 Jaguar TWO Jaguars,Angry X 3
   ‚è≠Ô∏è Skipped: 12 Puma Roar
   ‚è≠Ô∏è Skipped: 13 Cheetah
   