In [2]:
from __future__ import annotations

import operator
import os
import re
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import TypedDict, List, Optional, Literal, Annotated

from pydantic import BaseModel, Field

from langgraph.graph import StateGraph, START, END
from langgraph.types import Send

from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
from dotenv import load_dotenv
from langchain_community.tools.tavily_search import TavilySearchResults

In [None]:
# -----------------------------
# 1) Schemas
# -----------------------------
class Task(BaseModel):
    id: int
    title: str

    goal: str = Field(
        ...,
        description="One sentence describing what the viewer should understand/feel after this segment.",
    )
    # For scripts we use 'beats' (short spoken subpoints) instead of blog bullets
    beats: List[str] = Field(
        ...,
        min_length=2,
        max_length=8,
        description="2–8 spoken beats or bullets to cover in this segment.",
    )
    # target duration in seconds (preferred for scripts)
    target_duration_sec: int = Field(..., description="Target duration for the segment in seconds.")

    tags: List[str] = Field(default_factory=list)
    requires_research: bool = False
    requires_citations: bool = False
    requires_code: bool = False


class Plan(BaseModel):
    # script-centric fields
    script_title: str
    platform: str  # e.g. "youtube", "podcast", "tiktok"
    tone: str

    # tells workers what genre this is (prevents drift)
    script_type: Literal["explainer", "story", "tutorial", "news_roundup", "comparison", "system_design"] = "explainer"

    constraints: List[str] = Field(default_factory=list)
    tasks: List[Task]


class EvidenceItem(BaseModel):
    title: str
    url: str
    published_at: Optional[str] = None  # prefer ISO "YYYY-MM-DD"
    snippet: Optional[str] = None
    source: Optional[str] = None


class RouterDecision(BaseModel):
    needs_research: bool
    mode: Literal["closed_book", "hybrid", "open_book"]
    reason: str
    queries: List[str] = Field(default_factory=list)
    max_results_per_query: int = Field(5, description="How many results to fetch per query (3–8).")


class EvidencePack(BaseModel):
    evidence: List[EvidenceItem] = Field(default_factory=list)

In [None]:
class State(TypedDict):
    topic: str

    # routing / research
    mode: str
    needs_research: bool
    queries: List[str]
    evidence: List[dict]            # store evidence as dicts, not Pydantic objects
    plan: Optional[Plan]

    # recency control
    as_of: str
    recency_days: int

    # workers
    sections: Annotated[List[tuple[int, str]], operator.add]
    final: str

    # new: audience and output descriptors for router rules
    audience: Optional[dict]
    output: Optional[dict]


In [None]:
llm = ChatOpenAI(model="gpt-4.1-mini")

In [None]:
# -----------------------------
# 3) Router (decide upfront)
# -----------------------------
ROUTER_SYSTEM = """
You are a content-routing expert.

Your job is to decide whether this topic requires web research
based on volatility, recency sensitivity, and factual risk.

Definitions:
- closed_book: evergreen concepts, theory, fundamentals
- hybrid: mostly evergreen, but examples/tools/models must be current
- open_book: time-sensitive, news, rankings, weekly/monthly updates

Rules:
- If audience.level == beginner, lean toward closed_book unless clearly news.
- If output.platform == youtube and topic implies "latest", prefer hybrid/open.
- Generate 3–8 precise search queries if research is needed.
"""


def router_node(state: State) -> dict:
    topic = state["topic"]
    decider = llm.with_structured_output(RouterDecision)

    # include the audience/output context so the router can use platform/audience rules
    human = HumanMessage(content=(
        f"Topic: {topic}\n"
        f"As-of date: {state['as_of']}\n"
        f"Audience: {state.get('audience')}\n"
        f"Output: {state.get('output')}\n"
    ))

    decision = decider.invoke([SystemMessage(content=ROUTER_SYSTEM), human])

    







def router_node(state: State) -> dict:
    topic = state["topic"]
    decider = llm.with_structured_output(RouterDecision)

    human = HumanMessage(content=(
        f"Topic: {topic}\n"
        f"As-of date: {state['as_of']}\n"
        f"Audience: {state.get('audience')}\n"
        f"Output: {state.get('output')}\n"
    ))

    decision = decider.invoke(
        [
            SystemMessage(content=ROUTER_SYSTEM),
            human
        ]
    )

    # Set default recency window based on mode
    if decision.mode == "open_book":
        recency_days = 7
    elif decision.mode == "hybrid":
        recency_days = 45
    else:
        recency_days = 3650

    return {
        "needs_research": decision.needs_research,
        "mode": decision.mode,
        "queries": decision.queries,
        "recency_days": recency_days,
    }

def route_next(state: State) -> str:
    return "research" if state["needs_research"] else "orchestrator"

In [None]:
# -----------------------------
# 4) Research (Tavily)
# -----------------------------
def _tavily_search(query: str, max_results: int = 5) -> List[dict]:
    """
    Uses TavilySearchResults if installed and TAVILY_API_KEY is set.
    Returns list of dict with common fields. Note: published date is often missing.
    """
    tool = TavilySearchResults(max_results=max_results)
    results = tool.invoke({"query": query})

    normalized: List[dict] = []
    for r in results or []:
        normalized.append(
            {
                "title": r.get("title") or "",
                "url": r.get("url") or "",
                "snippet": r.get("content") or r.get("snippet") or "",
                "published_at": r.get("published_date") or r.get("published_at"),
                "source": r.get("source"),
            }
        )
    return normalized


def _iso_to_date(s: Optional[str]) -> Optional[date]:
    if not s:
        return None
    try:
        return date.fromisoformat(s[:10])
    except Exception:
        return None


RESEARCH_SYSTEM = """You are a research synthesizer for technical writing.

Given raw web search results, produce a deduplicated list of EvidenceItem objects.

Rules:
- Only include items with a non-empty url.
- Prefer relevant + authoritative sources (company blogs, docs, reputable outlets).
- Extract/normalize published_at as ISO (YYYY-MM-DD) if you can infer it from title/snippet.
  If you can't infer a date reliably, set published_at=null (do NOT guess).
- Keep snippets short.
- Deduplicate by URL.
"""

def research_node(state: State) -> dict:
    queries = (state.get("queries", []) or [])[:10]
    max_results = 6

    raw_results: List[dict] = []
    for q in queries:
        raw_results.extend(_tavily_search(q, max_results=max_results))

    if not raw_results:
        return {"evidence": []}

    extractor = llm.with_structured_output(EvidencePack)
    pack = extractor.invoke(
        [
            SystemMessage(content=RESEARCH_SYSTEM),
            HumanMessage(
                content=(
                    f"As-of date: {state['as_of']}\n"
                    f"Recency days: {state['recency_days']}\n\n"
                    f"Raw results:\n{raw_results}"
                )
            ),
        ]
    )

    # Deduplicate by URL
    dedup = {}
    for e in pack.evidence:
        if e.url:
            dedup[e.url] = e
    evidence = list(dedup.values())

    # HARD RECENCY FILTER for open_book weekly roundup:
    # keep only items with a parseable ISO date and within the window.
    mode = state.get("mode", "closed_book")
    if mode == "open_book":
        as_of = date.fromisoformat(state["as_of"])
        cutoff = as_of - timedelta(days=int(state["recency_days"]))
        fresh: List[EvidenceItem] = []
        for e in evidence:
            d = _iso_to_date(e.published_at)
            if d and d >= cutoff:
                fresh.append(e)
        evidence = fresh

    return {"evidence": evidence}

In [None]:
# -----------------------------
# 5) Orchestrator (Plan)
# -----------------------------
ORCH_SYSTEM = """


- youtube:
  - first task MUST be a hook (0–15s)
  - include pattern interrupts
  - specify target_duration_sec for each tas



You are a senior content architect.

You design a high-retention content plan tailored to:
- audience profile
- platform
- attention span
- depth preference

HARD RULES:
- Create 5–9 tasks.
- Each task must have:
  - clear goal
  - concrete bullets
  - target_size (words or seconds)

PLATFORM RULES:
- blog:
  - logical flow
  - depth scales with audience.level
- youtube:
  - first task MUST be a hook (0–15s)
  - include pattern interrupts
  - shorter segments for short attention_span
- linkedin/twitter:
  - compress ideas
  - focus on insight, not completeness

AUDIENCE RULES:
- beginner:
  - avoid jargon unless explained
  - include intuition
- expert:
  - assume prior knowledge
  - include edge cases, tradeoffs
- founder/general:
  - focus on impact, cost, decisions

DEPTH RULES:
- surface → conceptual overview
- deep → implementation reasoning
- hardcore → internals, failure modes, tradeoffs

QUALITY BAR:
- Tasks must NOT overlap.
- At least one task should include:
  - failure modes OR
  - performance/cost OR
  - security implications

OUTPUT MUST MATCH ContentPlan SCHEMA EXACTLY.
"""


def orchestrator_node(state: State) -> dict:
    planner = llm.with_structured_output(Plan)
    evidence = state.get("evidence", [])
    mode = state.get("mode", "closed_book")

    # Force blog_kind for open_book
    forced_kind = "news_roundup" if mode == "open_book" else None

    plan = planner.invoke(
        [
            SystemMessage(content=ORCH_SYSTEM),
            HumanMessage(
                content=(
                    f"Topic: {state['topic']}\n"
                    f"Mode: {mode}\n"
                    f"As-of: {state['as_of']} (recency_days={state['recency_days']})\n"
                    f"{'Force blog_kind=news_roundup' if forced_kind else ''}\n\n"
                    f"Evidence (ONLY use for fresh claims; may be empty):\n"
                    f"{[e.model_dump() for e in evidence][:16]}\n\n"
                    f"Instruction: If mode=open_book, your plan must NOT drift into a tutorial."
                )
            ),
        ]
    )

    # Ensure open_book forces the kind even if model forgets
    if forced_kind:
        plan.blog_kind = "news_roundup"

    return {"plan": plan}


In [None]:
# -----------------------------
# 6) Fanout
# -----------------------------
def fanout(state: State):
    assert state["plan"] is not None
    return [
        Send(
            "worker",
            {
                "task": task.model_dump(),
                "topic": state["topic"],
                "mode": state["mode"],
                "as_of": state["as_of"],
                "recency_days": state["recency_days"],
                "plan": state["plan"].model_dump(),
                "evidence": [e.model_dump() for e in state.get("evidence", [])],
            },
        )
        for task in state["plan"].tasks
    ]

In [None]:
# -----------------------------
# 7) Worker (write one section)
# -----------------------------
WORKER_SYSTEM =  """
You are a professional scriptwriter for spoken-word content.

Write EXACTLY ONE spoken content segment.

STRICT RULES:
- Follow the Task goal and beats exactly and in order.
- Respect target_duration_sec (±15%).
  - Use ~130 words/min as default pacing (approx 2.17 words/sec) when asked to estimate words.
- Output ONLY the spoken segment, no H1/H2 headings, no article-style metadata.

STYLE:
- Spoken language, short sentences, natural rhythm.
- Use stage directions in brackets when needed: [PAUSE], [ON SCREEN], [B-ROLL], [SFX].
- Include an optional short "read time" or estimated seconds in a single-line comment at the top like: <!-- ~45s --> (only if requested).

GROUNDING:
- If requires_citations==True or mode==open_book, cite ONLY the provided Evidence URLs in-line as: [Source](URL).
- If not supported by evidence, write: "Not found in provided sources."

CODE:
- If requires_code==True, include a short code block and a one-sentence explainer.

TONE CONTROL:
- calm, analytical, authoritative, hype — follow the plan.
"""


def worker_node(payload: dict) -> dict:
    task = Task(**payload["task"])
    plan = Plan(**payload["plan"])
    evidence = [EvidenceItem(**e) for e in payload.get("evidence", [])]  # ok to parse
    topic = payload["topic"]
    mode = payload.get("mode", "closed_book")
    as_of = payload.get("as_of")
    recency_days = payload.get("recency_days")

    # convert beats
    beats_text = "\n- " + "\n- ".join(task.beats)

    # estimate words from seconds for prompt help (130 wpm => ~2.17 words/sec)
    est_words = int(task.target_duration_sec * 2.17)

    evidence_text = ""
    if evidence:
        evidence_text = "\n".join(
            f"- {e.title} | {e.url} | {e.published_at or 'date:unknown'}".strip()
            for e in evidence[:20]
        )

    # Compose prompt with script-focused fields
    msg = (
        f"Script title: {plan.script_title}\n"
        f"Platform: {plan.platform}\n"
        f"Tone: {plan.tone}\n"
        f"Script type: {plan.script_type}\n"
        f"Constraints: {plan.constraints}\n"
        f"Topic: {topic}\n"
        f"Mode: {mode}\n"
        f"As-of: {as_of} (recency_days={recency_days})\n\n"
        f"Segment title: {task.title}\n"
        f"Goal: {task.goal}\n"
        f"Target duration (sec): {task.target_duration_sec} (~{est_words} words)\n"
        f"Tags: {task.tags}\n"
        f"requires_research: {task.requires_research}\n"
        f"requires_citations: {task.requires_citations}\n"
        f"requires_code: {task.requires_code}\n"
        f"Beats:{beats_text}\n\n"
        f"Evidence (ONLY use these URLs when citing):\n{evidence_text}\n"
    )

    section_text = llm.invoke([SystemMessage(content=WORKER_SYSTEM), HumanMessage(content=msg)]).content.strip()
    return {"sections": [(task.id, section_text)]}

In [None]:
# -----------------------------
# 8) Reducer (merge + save)
# -----------------------------
def reducer_node(state: State) -> dict:
    plan = state["plan"]
    if plan is None:
        raise ValueError("Reducer called without a plan.")

    ordered_sections = [md for _, md in sorted(state["sections"], key=lambda x: x[0])]
    body = "\n\n".join(ordered_sections).strip()

    # For scripts use a header line and plain text
    final_script = f"{plan.script_title}\n\n{body}\n"
    filename = f"{plan.script_title}.txt"
    Path(filename).write_text(final_script, encoding="utf-8")

    return {"final": final_script}


In [None]:
# -----------------------------
# 9) Build graph
# -----------------------------
g = StateGraph(State)
g.add_node("router", router_node)
g.add_node("research", research_node)
g.add_node("orchestrator", orchestrator_node)
g.add_node("worker", worker_node)
g.add_node("reducer", reducer_node)

g.add_edge(START, "router")
g.add_conditional_edges("router", route_next, {"research": "research", "orchestrator": "orchestrator"})
g.add_edge("research", "orchestrator")

g.add_conditional_edges("orchestrator", fanout, ["worker"])
g.add_edge("worker", "reducer")
g.add_edge("reducer", END)

app = g.compile()

app

In [None]:
def run(topic: str, as_of: Optional[str] = None, audience: Optional[dict] = None, output: Optional[dict] = None):
    if as_of is None:
        as_of = date.today().isoformat()
    if audience is None:
        audience = {"level": "beginner"}
    if output is None:
        output = {"platform": "youtube", "delivery_style": "calm"}

    out = app.invoke(
        {
            "topic": topic,
            "mode": "",
            "audience": audience,
            "output": output,
            "needs_research": False,
            "queries": [],
            "evidence": [],
            "plan": None,
            "as_of": as_of,
            "recency_days": 7,   # router may overwrite
            "sections": [],
            "final": "",
        }
    )

    plan: Plan = out["plan"]
    print("\n" + "=" * 100)
    print("TOPIC:", topic)
    print("AS_OF:", out.get("as_of"), "RECENCY_DAYS:", out.get("recency_days"))
    print("MODE:", out.get("mode"))
    print("BLOG_KIND:", plan.blog_kind)
    print("NEEDS_RESEARCH:", out.get("needs_research"))
    print("QUERIES:", (out.get("queries") or [])[:6])
    print("EVIDENCE_COUNT:", len(out.get("evidence", [])))
    if out.get("evidence"):
        print("EVIDENCE_SAMPLE:", [e.model_dump() for e in out["evidence"][:2]])
    print("TASKS:", len(plan.tasks))
    print("SAVED_MD_CHARS:", len(out.get("final", "")))
    print("=" * 100 + "\n")

    return out

In [None]:
if __name__ == "__main__":
    run(
        topic="Self Attention explained intuitively",
        audience={
            "level": "beginner",
            "persona": "student",
            "attention_span": "medium",
            "prefers_examples": True,
            "prefers_code": False
        },
        output={
            "platform": "youtube",
            "delivery_style": "calm",
            "depth": "surface",
            "include_cta": True
        }
    )
