<a href="https://colab.research.google.com/github/mshumer/sora-extend/blob/main/Sora_Extend.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sora 2 — AI‑Planned, Scene‑Exact Prompts with Continuity (Chained >12s)

Built by [Matt Shumer](https://x.com/mattshumer_).

Pipeline:
1) Use an LLM (“GPT‑5 Thinking”) to plan N scene prompts from a base idea. The LLM is prompted to do this intelligently to enable continuity.
2) Render each segment with Sora 2; for continuity, pass the prior segment’s **final frame** as `input_reference`.
3) Concatenate segments into a single MP4.

In [None]:
# @title 1) Install & imports

import sys, subprocess, importlib.util, shutil, os, textwrap, tempfile

def pip_install(*pkgs):
    # Install into the *current* kernel's interpreter
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-U", *pkgs])

def ensure(spec_name, *pip_pkgs):
    if importlib.util.find_spec(spec_name) is None:
        pip_install(*pip_pkgs)
    return importlib.util.find_spec(spec_name) is not None

MOVIEPY_AVAILABLE = ensure("moviepy", "moviepy>=2.0.0", "imageio", "imageio-ffmpeg")

# Try to import MoviePy if now available
if MOVIEPY_AVAILABLE:
    from moviepy.editor import VideoFileClip, concatenate_videoclips
else:
    # Fallback concat uses ffmpeg (from imageio-ffmpeg or system)
    try:
        import imageio_ffmpeg
        FFMPEG_BIN = imageio_ffmpeg.get_ffmpeg_exe()
    except Exception:
        FFMPEG_BIN = shutil.which("ffmpeg")

    if not FFMPEG_BIN:
        # Final attempt to get ffmpeg via pip
        pip_install("imageio-ffmpeg")
        import imageio_ffmpeg
        FFMPEG_BIN = imageio_ffmpeg.get_ffmpeg_exe()

    if not FFMPEG_BIN:
        raise RuntimeError(
            "FFmpeg not found and MoviePy unavailable. "
            "Install ffmpeg on your system or allow pip installs."
        )

print("MoviePy available:", MOVIEPY_AVAILABLE)

!pip -q install --upgrade openai requests opencv-python-headless imageio[ffmpeg]

import os, re, io, json, time, math, mimetypes
from pathlib import Path
import requests
import cv2
from moviepy.editor import VideoFileClip, concatenate_videoclips
from IPython.display import Video as IPyVideo, display
from openai import OpenAI

# 2) Config

Fill in:
- OPENAI_API_KEY
- SECONDS_PER_SEGMENT (options: 4, 8, 12)
- NUM_GENERATIONS (this is the total number of segments we will generate and concatenate. to get the total length, do `SECONDS_PER_SEGMENT * NUM_GENERATIONS`)

In [None]:
os.environ["OPENAI_API_KEY"] = "Your API Key"  # for quick local testing only (avoid in shared notebooks)

client = OpenAI()  # uses OPENAI_API_KEY

# ---------- Planner (text model) ----------
# If you have access to "GPT-5 Thinking", set it below. Otherwise, fallback to a strong reasoning model you have.
PLANNER_MODEL = os.environ.get("PLANNER_MODEL", "gpt-5")

# ---------- Sora (video model) ----------
SORA_MODEL = "sora-2"        # or "sora-2-pro"
SIZE       = "1280x720"      # must stay constant across segments

# ---------- Your project inputs ----------
BASE_PROMPT          = "Gameplay footage of a game releasing in 2027, a car driving through a futuristic city"
SECONDS_PER_SEGMENT  = 8
NUM_GENERATIONS      = 2

# Output directory
OUT_DIR = Path("sora_ai_planned_chain")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# Polling cadence
POLL_INTERVAL_SEC = 2
PRINT_PROGRESS_BAR = True

# Low-level HTTP for Sora Video API calls
API_BASE = "https://api.openai.com/v1"
HEADERS_JSON = {"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}", "Content-Type": "application/json"}
HEADERS_AUTH = {"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}"}

# 3) The planner system prompt

We’ll ask the planner model to output a clean JSON object with one prompt per generation.
The prompts contain context and the actual shot details, maximizing continuity.

This isn't super optimized and was a first pass done by GPT. If people like this notebook, let me know on X, and I'll improve this!

In [None]:
PLANNER_SYSTEM_INSTRUCTIONS = r"""
You are a senior prompt director for Sora 2. Your job is to transform:
- a Base prompt (broad idea),
- a fixed generation length per segment (seconds),
- and a total number of generations (N),

into **N crystal-clear shot prompts** with **maximum continuity** across segments.

Rules:
1) Return **valid JSON** only. Structure:
   {
     "segments": [
       {
         "title": "Generation 1",
         "seconds": 6,
         "prompt": "<prompt block to send into Sora>"
       },
       ...
     ]
   }
   - `seconds` MUST equal the given generation length for ALL segments.
   - `prompt` should include a **Context** section for model guidance AND a **Prompt** line for the shot itself,
     exactly like in the example below.
2) Continuity:
   - Segment 1 starts fresh from the BASE PROMPT.
   - Segment k (k>1) must **begin exactly at the final frame** of segment k-1.
   - Maintain consistent visual style, tone, lighting, and subject identity unless explicitly told to change.
3) Safety & platform constraints:
   - Do not depict real people (including public figures) or copyrighted characters.
   - Avoid copyrighted music and avoid exact trademark/logos if policy disallows them; use brand-safe wording.
   - Keep content suitable for general audiences.
4) Output only JSON (no Markdown, no backticks).
5) Keep the **Context** lines inside the prompt text (they're for the AI, not visible).
6) Make the writing specific and cinematic; describe camera, lighting, motion, and subject focus succinctly.

Below is an **EXAMPLE (verbatim)** of exactly how to structure prompts with context and continuity:

Example:
Base prompt: "Intro video for the iPhone 19"
Generation length: 6 seconds each
Total generations: 3

Clearly defined prompts with maximum continuity and context:

### Generation 1:

<prompt>
First shot introducing the new iPhone 19. Initially, the screen is completely dark. The phone, positioned vertically and facing directly forward, emerges slowly and dramatically out of darkness, gradually illuminated from the center of the screen outward, showcasing a vibrant, colorful, dynamic wallpaper on its edge-to-edge glass display. The style is futuristic, sleek, and premium, appropriate for an official Apple product reveal.
<prompt>

---

### Generation 2:

<prompt>
Context (not visible in video, only for AI guidance):

* You are creating the second part of an official intro video for Apple's new iPhone 19.
* The previous 6-second scene ended with the phone facing directly forward, clearly displaying its vibrant front screen and colorful wallpaper.

Prompt: Second shot begins exactly from the final frame of the previous scene, showing the front of the iPhone 19 with its vibrant, colorful display clearly visible. Now, smoothly rotate the phone horizontally, turning it from the front to reveal the back side. Focus specifically on the advanced triple-lens camera module, clearly highlighting its premium materials, reflective metallic surfaces, and detailed lenses. Maintain consistent dramatic lighting, sleek visual style, and luxurious feel matching the official Apple product introduction theme.
</prompt>

---

### Generation 3:

<prompt>
Context (not visible in video, only for AI guidance):

* You are creating the third and final part of an official intro video for Apple's new iPhone 19.
* The previous 6-second scene ended clearly showing the back of the iPhone 19, focusing specifically on its advanced triple-lens camera module.

Prompt: Final shot begins exactly from the final frame of the previous scene, clearly displaying the back side of the iPhone 19, with special emphasis on the triple-lens camera module. Now, have a user's hand gently pick up the phone, naturally rotating it from the back to the front and bringing it upward toward their face. Clearly show the phone smoothly and quickly unlocking via Face ID recognition, transitioning immediately to a vibrant home screen filled with updated app icons. Finish the scene by subtly fading the home screen into the iconic Apple logo. Keep the visual style consistent, premium, and elegant, suitable for an official Apple product launch.
</prompt>

--

Notice how we broke up the initial prompt into multiple prompts that provide context and continuity so this all works seamlessly.
""".strip()


# 4) Planner: ask the LLM to generate prompts (JSON)

In [None]:
def plan_prompts_with_ai(base_prompt: str, seconds_per_segment: int, num_generations: int):
    """
    Calls the Responses API to produce a JSON object:
    {
      "segments": [
        {"title": "...", "seconds": <int>, "prompt": "<full prompt block>"},
        ...
      ]
    }
    """
    # Compose a single plain-text input with the variables:
    user_input = f"""
BASE PROMPT: {base_prompt}

GENERATION LENGTH (seconds): {seconds_per_segment}
TOTAL GENERATIONS: {num_generations}

Return exactly {num_generations} segments.
""".strip()

    # Minimal Responses API call; see docs & library readme for details.
    # (If your account lacks the requested model, change PLANNER_MODEL accordingly.)
    resp = client.responses.create(
        model=PLANNER_MODEL,
        instructions=PLANNER_SYSTEM_INSTRUCTIONS,
        input=user_input,
    )

    text = getattr(resp, "output_text", None) or ""
    if not text:
        # Fallback: collect from structured blocks if needed
        # (Different SDK versions may put text in resp.output or in content items.)
        try:
            # Attempt to reconstruct from generic fields
            text = json.dumps(resp.to_dict())
        except Exception:
            raise RuntimeError("Planner returned no text; try changing PLANNER_MODEL.")

    # Extract the first JSON object found in the response text.
    m = re.search(r'\{[\s\S]*\}', text)
    if not m:
        raise ValueError("Planner did not return JSON. Inspect response and adjust instructions.")
    data = json.loads(m.group(0))

    # Basic validation and enforcement
    segments = data.get("segments", [])
    if len(segments) != num_generations:
        segments = segments[:num_generations]
        # or pad/adjust; here we simply clamp.

    # Force durations to the requested number (some models might deviate)
    for seg in segments:
        seg["seconds"] = int(seconds_per_segment)

    return segments

segments_plan = plan_prompts_with_ai(BASE_PROMPT, SECONDS_PER_SEGMENT, NUM_GENERATIONS)

print("AI‑planned segments:\n")
for i, seg in enumerate(segments_plan, start=1):
    print(f"[{i:02d}] {seg['seconds']}s — {seg.get('title','(untitled)')}")
    print(seg["prompt"])
    print("-" * 80)


# 5) Sora helpers (create → poll → download → extract final frame)

In [None]:
import json, mimetypes
from pathlib import Path
import requests

API_BASE = "https://api.openai.com/v1"
HEADERS_AUTH = {"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}"}

def guess_mime(path: Path) -> str:
    t = mimetypes.guess_type(str(path))[0]
    return t or "application/octet-stream"

def _dump_error(resp: requests.Response):
    rid = resp.headers.get("x-request-id", "<none>")
    try:
        body = resp.json()
    except Exception:
        body = resp.text
    return f"HTTP {resp.status_code} (request-id: {rid})\n{body}"

def create_video(prompt: str, size: str, seconds: int, model: str, input_reference: Path | None):
    """
    Always send multipart/form-data. This tends to be the most compatible with /videos,
    and also supports input_reference seamlessly.
    """
    files = {
        "model":   (None, model),
        "prompt":  (None, prompt),
        "seconds": (None, str(seconds)),
    }
    if size:
        files["size"] = (None, size)

    if input_reference is not None:
        mime = guess_mime(input_reference)
        files["input_reference"] = (Path(input_reference).name, open(input_reference, "rb"), mime)

    r = requests.post(f"{API_BASE}/videos", headers=HEADERS_AUTH, files=files, timeout=300)
    if r.status_code >= 400:
        raise RuntimeError("Create video failed:\n" + _dump_error(r))
    return r.json()

def retrieve_video(video_id: str):
    r = requests.get(f"{API_BASE}/videos/{video_id}", headers=HEADERS_AUTH, timeout=60)
    if r.status_code >= 400:
        raise RuntimeError("Retrieve video failed:\n" + _dump_error(r))
    return r.json()

def download_video_content(video_id: str, out_path: Path, variant: str = "video"):
    with requests.get(
        f"{API_BASE}/videos/{video_id}/content",
        headers=HEADERS_AUTH,
        params={"variant": variant},
        stream=True,
        timeout=600,
    ) as r:
        if r.status_code >= 400:
            raise RuntimeError("Download failed:\n" + _dump_error(r))
        with open(out_path, "wb") as f:
            for chunk in r.iter_content(chunk_size=8192):
                if chunk: f.write(chunk)
    return out_path



def poll_until_complete(job: dict, poll_interval=POLL_INTERVAL_SEC):
    video = job
    vid = video["id"]

    def bar(pct: float, width=30):
        filled = int(max(0.0, min(100.0, pct)) / 100 * width)
        return "=" * filled + "-" * (width - filled)

    while video.get("status") in ("queued", "in_progress"):
        if PRINT_PROGRESS_BAR:
            pct = float(video.get("progress", 0) or 0)
            status_text = "Queued" if video["status"] == "queued" else "Processing"
            print(f"\r{status_text}: [{bar(pct)}] {pct:5.1f}%", end="")
        time.sleep(poll_interval)
        video = retrieve_video(vid)

    if PRINT_PROGRESS_BAR:
        print()

    if video.get("status") != "completed":
        msg = (video.get("error") or {}).get("message", f"Job {vid} failed")
        raise RuntimeError(msg)
    return video


def extract_last_frame(video_path: Path, out_image_path: Path) -> Path:
    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        raise RuntimeError(f"Failed to open {video_path}")

    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
    success, frame = False, None

    if total > 0:
        cap.set(cv2.CAP_PROP_POS_FRAMES, total - 1)
        success, frame = cap.read()
    if not success or frame is None:
        cap.release()
        cap = cv2.VideoCapture(str(video_path))
        while True:
            ret, f = cap.read()
            if not ret: break
            frame = f
            success = True
    cap.release()

    if not success or frame is None:
        raise RuntimeError(f"Could not read last frame from {video_path}")

    out_image_path.parent.mkdir(parents=True, exist_ok=True)
    ok = cv2.imwrite(str(out_image_path), frame)
    if not ok:
        raise RuntimeError(f"Failed to write {out_image_path}")
    return out_image_path


# 6) Chain generator (use planner output; continuity via final frame)

In [None]:
def chain_generate_sora(segments, size: str, model: str):
    """
    segments: list of {"title": str, "seconds": int, "prompt": str}
    Returns list of video segment Paths.
    """
    input_ref = None
    segment_paths = []

    for i, seg in enumerate(segments, start=1):
        secs   = int(seg["seconds"])
        prompt = seg["prompt"]

        print(f"\n=== Generating Segment {i}/{len(segments)} — {secs}s ===")
        job = create_video(prompt=prompt, size=size, seconds=secs, model=model, input_reference=input_ref)
        print("Started job:", job["id"], "| status:", job["status"])

        completed = poll_until_complete(job)

        seg_path = OUT_DIR / f"segment_{i:02d}.mp4"
        download_video_content(completed["id"], seg_path, variant="video")
        print("Saved", seg_path)
        segment_paths.append(seg_path)

        # Prepare input reference (final frame) for the next segment
        frame_path = OUT_DIR / f"segment_{i:02d}_last.jpg"
        extract_last_frame(seg_path, frame_path)
        print("Extracted last frame ->", frame_path)
        input_ref = frame_path

    return segment_paths


def concatenate_segments(segment_paths, out_path: Path) -> Path:
    clips = [VideoFileClip(str(p)) for p in segment_paths]
    target_fps = clips[0].fps or 24
    result = concatenate_videoclips(clips, method="compose")
    result.write_videofile(
        str(out_path),
        codec="libx264",
        audio_codec="aac",
        fps=target_fps,
        preset="medium",
        threads=0
    )
    for c in clips:
        c.close()
    return out_path


# 7) Run the whole pipeline

In [None]:
# 1) (Already ran) Plan prompts with AI -> segments_plan
# 2) Generate with Sora 2 in a chain
segment_paths = chain_generate_sora(segments_plan, size=SIZE, model=SORA_MODEL)

# 3) Concatenate
final_path = OUT_DIR / "combined.mp4"
concatenate_segments(segment_paths, final_path)
print("\nWrote combined video:", final_path)

# 4) Inline preview
display(IPyVideo(str(final_path), embed=True, width=768))
