In [None]:
# @title 1. Environment Setup
!pip install -q datasets huggingface_hub
import os
import re
import json
import shutil
import time
from datasets import load_dataset
from google.colab import drive
from google.colab import userdata
from huggingface_hub import login

# 1. Mount Drive
if not os.path.exists('/content/drive'):
    drive.mount('/content/drive')

# 2. Login to Hugging Face
# Ensure 'HF_TOKEN' is set in your Colab Secrets (Key Icon)
try:
    hf_token = userdata.get('HF_TOKEN')
    login(hf_token)
    print("[INFO] Logged in to Hugging Face successfully.")
except Exception as e:
    print(f"[WARN] Could not login to Hugging Face: {e}")
    print("       (Make sure 'HF_TOKEN' is set in Colab Secrets)")

In [None]:
# @title 2. Configuration
# Paths
LOCAL_FILE = "/content/shaders_archive_temp.jsonl"
DRIVE_DIR = "/content/drive/MyDrive/projects/EarthShader/dataset/thestack"
FINAL_DRIVE_FILE = os.path.join(DRIVE_DIR, "shaders_archive.jsonl")

# Execution Limit (Set to None for full production run)
LIMIT = None

# Safe License List (Strict Permissive Only)
SAFE_LICENSES = [
    "MIT", "Apache-2.0", "BSD-3-Clause", "BSD-2-Clause",
    "CC0-1.0", "Unlicense", "ISC", "BlueOak-1.0.0"
]

In [None]:
# @title 3. Helper Functions (Sanitization, Comments & Filters)
import re
import json

def extract_code_from_json(text):
    """Attempts to parse JSON dumps and extract the 'code' field."""
    text = text.strip()
    if text.startswith("{") and text.endswith("}") and '"code":' in text:
        try:
            data = json.loads(text)
            return data.get("code", text)
        except:
            return text
    return text

def strip_comments(code):
    """Removes C-style comments (// and /* */)."""
    code = re.sub(r'/\*[\s\S]*?\*/', '', code)
    code = re.sub(r'//.*', '', code)
    return code

def collapse_newlines(code):
    """Aggressively removes vertical whitespace (Max 1 empty line)."""
    lines = [line.rstrip() for line in code.splitlines()]
    clean_lines = []
    empty_count = 0

    for line in lines:
        if not line:
            empty_count += 1
            if empty_count <= 1: clean_lines.append(line)
        else:
            clean_lines.append(line)
            empty_count = 0

    return "\n".join(clean_lines)

def clean_shader_code(code):
    """
    Full cleaning pipeline.
    """
    if not code: return ""

    # 1. Unwrap JSON & Fix Newlines
    code = extract_code_from_json(code)
    code = code.replace("\\n", "\n").replace("\\t", "\t")

    # 2. Strip Comments
    code = strip_comments(code)

    # 3. Rename Legacy Variables
    replacements = {
        "iGlobalTime": "iTime",
        "aTexture0": "iChannel0",
        "aTexture1": "iChannel1",
        "aTexture2": "iChannel2",
        "aTexture3": "iChannel3",
        "u_tex0": "iChannel0",
        "image.Sample": "texture"
    }
    for old, new in replacements.items():
        code = code.replace(old, new)

    # 4. Remove Redefinitions (Aggressive)
    # Catches: "uniform float iTime;", "vec3 iResolution;", "float iTime;"
    # Matches start of line -> any type/storage -> built-in name -> semicolon
    builtins = r"(iResolution|iTime|iMouse|iDate|iFrame|iChannelTime|iChannelResolution|iSampleRate)"

    # Pattern A: Uniforms (e.g. "uniform vec3 iResolution;")
    code = re.sub(r"^\s*uniform\s+.*?" + builtins + r".*?;", "", code, flags=re.MULTILINE)

    # Pattern B: Explicit types (e.g. "float iTime;", "vec3 iResolution;")
    # This catches the specific case in Index 42
    code = re.sub(r"^\s*[a-zA-Z0-9]+\s+" + builtins + r"\s*;", "", code, flags=re.MULTILINE)

    # 5. Remove "varying" lines
    code = re.sub(r'^\s*varying\s+.*?;', '', code, flags=re.MULTILINE)

    # 6. Strip Wrapper
    # Now matches "void main(void)" and "void main()" by using [^)]*
    # DOTALL flag allows matching across newlines if the wrapper is multiline
    wrapper_pattern = r'void\s+main\s*\([^)]*\)\s*\{[^}]*mainImage[^}]*\}'
    code = re.sub(wrapper_pattern, '', code, flags=re.DOTALL)

    # 7. Final Polish
    code = collapse_newlines(code)

    return code.strip()

def is_strictly_shadertoy(code):
    if not code: return False

    # 1. Must use GLSL signature
    if "void mainImage" not in code: return False

    # 2. No External Dependencies
    if "#include" in code: return False

    # 3. Procedural Only Check (Reject textures)
    banned_keywords = [
        "iChannel", "texture(", "texture2D(", "sampler2D", "texelFetch",
        "image.Sample"
    ]
    if any(kw in code for kw in banned_keywords):
        return False

    # 4. Must use standard inputs
    if "fragCoord" not in code and "iResolution" not in code: return False

    return True

def is_safe_license(license_list):
    if not license_list: return False
    return any(lic in SAFE_LICENSES for lic in license_list)

def save_to_drive():
    if not os.path.exists(DRIVE_DIR):
        os.makedirs(DRIVE_DIR, exist_ok=True)
    shutil.copy(LOCAL_FILE, FINAL_DRIVE_FILE)

In [None]:
# @title 4. Start Download Job
def download_production():
    print(f"[*] Starting PRODUCTION Download from 'The Stack'...")
    print(f"[*] Target: {FINAL_DRIVE_FILE}")

    # Load dataset in Streaming Mode
    try:
        ds = load_dataset("bigcode/the-stack", data_dir="data/glsl", split="train", streaming=True)
    except Exception as e:
        print(f"[!] Error connecting to Hugging Face: {e}")
        return

    saved_count = 0
    scanned_count = 0
    start_time = time.time()

    print(f"[*] Stream opened. Scanning & Cleaning...")

    with open(LOCAL_FILE, "w", encoding="utf-8") as f:
        try:
            for sample in ds:
                scanned_count += 1

                # Update Status every 10,000 scanned files
                if scanned_count % 10000 == 0:
                    elapsed = time.time() - start_time
                    rate = scanned_count / (elapsed + 0.01)
                    print(f"   [Scanning] Checked: {scanned_count:,} | Saved: {saved_count:,} | Speed: {rate:.0f} files/sec", end="\r", flush=True)

                try:
                    raw_code = sample.get("content", "")
                    licenses = sample.get("max_stars_repo_licenses", [])

                    # 1. License Filter (Fastest check)
                    if not is_safe_license(licenses):
                        continue

                    # 2. Clean the Code
                    clean_code = clean_shader_code(raw_code)

                    # 3. Strict Shadertoy Validation (on cleaned code)
                    if is_strictly_shadertoy(clean_code):
                        record = {
                            "id": sample.get("hexsha"),
                            "license": licenses[0],
                            "repo": sample.get("max_stars_repo_name", "unknown"),
                            "code": clean_code
                        }
                        f.write(json.dumps(record) + "\n")
                        saved_count += 1

                        # Backup every 500 saved shaders
                        if saved_count % 500 == 0:
                            save_to_drive()
                            print(f"   [Backup] Saved {saved_count} shaders to Drive...                ", end="\r", flush=True)

                    if LIMIT and saved_count >= LIMIT:
                        print("\n[*] Limit reached.")
                        break

                except Exception:
                    continue

        except KeyboardInterrupt:
            print("\n\n[!] Interrupted by user. Saving progress...")

    # Final Save
    save_to_drive()

    total_time = (time.time() - start_time) / 60
    print(f"\n\n[SUCCESS] Run Complete in {total_time:.1f} minutes.")
    print(f"[-] Total Scanned: {scanned_count:,}")
    print(f"[-] Total Saved:   {saved_count:,}")
    print(f"[-] Location:      {FINAL_DRIVE_FILE}")

if __name__ == "__main__":
    download_production()

In [None]:
# @title 5. Inspect & Verify Data
import random

def inspect_shader(index=None):
    """
    Displays a shader from the dataset.
    - index (int): Show specific shader by line number (0-indexed).
    - index (None): Show a random shader.
    """
    if not os.path.exists(FINAL_DRIVE_FILE):
        print("[!] Output file not found. Run the download first.")
        return

    # Read lines efficiently
    # (The dataset is ~50-100MB, fitting easily in RAM for quick debugging)
    with open(FINAL_DRIVE_FILE, 'r') as f:
        lines = f.readlines()

    total_count = len(lines)
    if total_count == 0:
        print("[!] Dataset is empty.")
        return

    # Select Target
    if index is not None:
        if 0 <= index < total_count:
            target_idx = index
            print(f"[*] Selecting SPECIFIC shader at Index {target_idx}...")
        else:
            print(f"[!] Index {index} out of bounds (0-{total_count-1}). Showing random instead.")
            target_idx = random.randint(0, total_count - 1)
    else:
        target_idx = random.randint(0, total_count - 1)
        print(f"[*] Selecting RANDOM shader (Index {target_idx} of {total_count})...")

    # Parse
    try:
        data = json.loads(lines[target_idx])
    except json.JSONDecodeError:
        print(f"[!] Error decoding JSON at line {target_idx}")
        return

    # Display Metadata
    print("\n" + "="*80)
    print(f"SHADER METADATA (Index {target_idx})")
    print(f"ID:      {data.get('id')}")
    print(f"Repo:    {data.get('repo')}")
    print(f"License: {data.get('license')}")
    print("="*80 + "\n")

    # Display Code
    code = data.get('code', '')
    print(code[:10000]) # Print first 10k characters (likely whole file)
    if len(code) > 10000:
        print("\n... [Truncated at 10k characters] ...")

    print("\n" + "="*80)

    # Automated Quality Checks
    print("QUALITY CHECK:")
    issues = []

    # Check for wrappers
    if "uniform vec3 iResolution" in code: issues.append("Wrapper: Found 'uniform iResolution'")
    if "void main()" in code: issues.append("Wrapper: Found 'void main()' footer")

    # Check for textures (Crucial for black screen prevention)
    if "texture(" in code or "iChannel" in code: issues.append("Dependency: Found external texture lookup")

    if not issues:
        print("YAY Code looks clean (Pure Procedural Shadertoy format).")
        print("    Copy-paste this into https://www.shadertoy.com/new to test.")
    else:
        for issue in issues:
            print(f"XXX {issue}")
    print("="*80)

# --- HOW TO USE ---
# inspect_shader()       <-- Shows a random shader
# inspect_shader(5)      <-- Shows the 6th shader (Index 5)

inspect_shader()