## importing Library and environment variables

In [1]:
from __future__ import annotations

import operator
import os
import re
from datetime import date, timedelta
from pathlib import Path
from typing import TypedDict, List, Optional, Literal, Annotated

from pydantic import BaseModel, Field

from langgraph.graph import StateGraph, START, END
from langgraph.types import Send

from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_community.tools.tavily_search import TavilySearchResults
from dotenv import load_dotenv

load_dotenv()


True

## Pydantic Schemas

In [15]:
#-----------------------------
# 1) Schemas
#----------------------------
class Task(BaseModel):
    id: int
    title: str
    goal: str = Field(
        ...,
        description = "One sentence describing what the reader should be able to do/understand after this section.",
    )
    bullets: List[str] = Field(
        ...,
        min_length=3,
        max_length=6,
        description="3–6 concrete, non-overlapping subpoints to cover in this section.",
    )
    target_words: int = Field(..., description="Target word count for this section (120–550).")
    
    tags: List[str] = Field(default_factory=list)
    requires_research: bool = False
    requires_citations: bool = False
    requires_code: bool = False
    

class Plan(BaseModel):
    blog_title: str
    audience: str
    tone: str
    blog_kind: Literal["explainer", "tutorial", "news_roundup", "comparison", "system_design"] = "explainer"
    constraints: List[str] = Field(default_factory=list)
    tasks: List[Task]

class EvidenceItem(BaseModel):
    title: str
    url: str
    published_at: Optional[str] = None  # keep if Tavily provides; DO NOT rely on it
    snippet: Optional[str] = None
    source: Optional[str] = None
    
class RouterDecision(BaseModel):
    needs_research: bool
    mode: Literal["closed_book", "hybrid", "open_book"]
    queries: List[str] = Field(default_factory=list)

class EvidencePack(BaseModel):
    evidence: List[EvidenceItem] = Field(default_factory=list)
    

## LangGraph State Schema

In [3]:
class State(TypedDict):
    topic: str
    
    #routing / research
    mode: str
    need_research: bool
    queries: List[str]
    evidence: List[EvidenceItem]
    plan: Optional[Plan]
    
    #workers
    sections: Annotated[List[tuple[int,str]], operator.add] # (task_id, section_md)
    
    #reducer/image
    merged_md: str
    md_with_placeholders: str
    image_specs: List[dict]
    final: str
    

    

## OpenAI Client

In [4]:
llm = ChatOpenAI(
    model="gemini-3-flash-preview",
    api_key = os.getenv("GEMINI_API_KEY"),
    base_url= "https://generativelanguage.googleapis.com/v1beta/openai/"
)

In [5]:
messages = [
    (
        "system",
        "You are a helpful assistant that translates English to French. Translate the user sentence.",
    ),
    ("human", "I love programming."),
]
ai_msg = llm.invoke(messages)
ai_msg.text

"J'adore la programmation."

## LangGraph Graph Architecture

In [19]:
#----------------------
# build main graph
#-----------------------
g = StateGraph(State)

#added nodes
g.add_node("router", router_node)
g.add_node("research", research_node)
g.add_node("orchestrator", orchestrator_node)
g.add_node("worker", worker_node)
g.add_node("reducer", reducer_subgraph)

# graph construction by joining edges
g.add_edge(START, "router")
g.add_conditional_edges("router", route_next, {"research":"research","orchestrator":"orchestrator"})
g.add_edge("research", "orchestrator")
g.add_conditional_edges("orchestrator", fanout, ["worker"])
g.add_edge("worker", "reducer")
g.add_edge("reducer",END)

app = g.compile()
app


NameError: name 'orchestrator_node' is not defined

In [8]:
ROUTER_SYSTEM =  """You are a routing module for a technical blog planner.

Decide whether web research is needed BEFORE planning.

Modes:
- closed_book (needs_research=false):
  Evergreen topics where correctness does not depend on recent facts (concepts, fundamentals).
- hybrid (needs_research=true):
  Mostly evergreen but needs up-to-date examples/tools/models to be useful.
- open_book (needs_research=true):
  Mostly volatile: weekly roundups, "this week", "latest", rankings, pricing, policy/regulation.

If needs_research=true:
- Output 3–10 high-signal queries.
- Queries should be scoped and specific (avoid generic queries like just "AI" or "LLM").
- If user asked for "last week/this week/latest", reflect that constraint IN THE QUERIES.
"""

def router_node(state: State) -> dict:
    
    topic = state['topic']
    decider = llm.with_structured_output(RouterDecision)
    decision = decider.invoke(
      [
        SystemMessage(content = ROUTER_SYSTEM),
        HumanMessage(content = f"Topic: {topic}"),
      ]
    )
    
    return {
      "needs_research": decision.needs_research,
      "mode": decision.mode,
      "queries": decision.queries,
    }
    

In [10]:
def route_next(state : State) -> str:
    return "research" if state["need_research"] else "orchestrator"

In [None]:
# -----------------------------
#  Research (Tavily) 
# -----------------------------

def tavily_search(query: str, max_results: int = 5) -> List[dict]:
    
    tool = TavilySearchResults(max_results=max_results)
    results = tool.invoke({"query": query})
    
    normalized: List[dict] = []
    for r in results or []:
        normalized.append(
            {
                "title": r.get("title") or "",
                "uu rl": r.get("url") or "",
                "snippet": r.get("content") or r.get("snippet") or "",
                "published_at": r.get("published_date") or r.get("published_at"),
                "source": r.get("source"),
            }
        )
    return normalized


In [18]:
# -------------------------------------------------------------------------------#
#                  Research ( Using Tavily Search Engine API)
# -------------------------------------------------------------------------------#
RESEARCH_SYSTEM = """You are a research synthesizer for technical writing.

Given raw web search results, produce a deduplicated list of EvidenceItem objects.

Rules:
- Only include items with a non-empty url.
- Prefer relevant + authoritative sources (company blogs, docs, reputable outlets).
- If a published date is explicitly present in the result payload, keep it as YYYY-MM-DD.
  If missing or unclear, set published_at=null. Do NOT guess.
- Keep snippets short.
- Deduplicate by URL.
"""

def research_node(state : State) -> dict:
      
      #  take the first 10 queries from state  
      queries = (state.get("queries",[]) or [])
      max_result = 6
      
      raw_results: List[dict] = []
      
      for q in queries:
            raw_results.extend(tavily_search(q,max_result= max_result))
      
      if not raw_results:
            return {"evidence": []}
      
      extractor = llm.with_structured_output(EvidencePack)
      
      pack = extractor.invoke(
            [
                  SystemMessage(content = RESEARCH_SYSTEM),
                  HumanMessage(content = f"Raw results:\n{raw_results}"),
            ]
      )
      
      # dedupicate by URL
      dedup = {}
      for e in pack.evidence:
            if e.url:
                  dedup[e.url] = e
                  
      return {
            "evidence" : list(dedup.values())
      }
      



In [21]:
# -----------------------------
# 5) Orchestrator (Plan)
# -----------------------------

ORCH_SYSTEM = """You are a senior technical writer and developer advocate.
Your job is to produce a highly actionable outline for a technical blog post.

Hard requirements:
- Create 5–9 sections (tasks) suitable for the topic and audience.
- Each task must include:
  1) goal (1 sentence)
  2) 3–6 bullets that are concrete, specific, and non-overlapping
  3) target word count (120–550)

Quality bar:
- Assume the reader is a developer; use correct terminology.
- Bullets must be actionable: build/compare/measure/verify/debug.
- Ensure the overall plan includes at least 2 of these somewhere:
  * minimal code sketch / MWE (set requires_code=True for that section)
  * edge cases / failure modes
  * performance/cost considerations
  * security/privacy considerations (if relevant)
  * debugging/observability tips

Grounding rules:
- Mode closed_book: keep it evergreen; do not depend on evidence.
- Mode hybrid:
  - Use evidence for up-to-date examples (models/tools/releases) in bullets.
  - Mark sections using fresh info as requires_research=True and requires_citations=True.
- Mode open_book:
  - Set blog_kind = "news_roundup".
  - Every section is about summarizing events + implications.
  - DO NOT include tutorial/how-to sections unless user explicitly asked for that.
  - If evidence is empty or insufficient, create a plan that transparently says "insufficient sources"
    and includes only what can be supported.

Output must strictly match the Plan schema.
"""

def orchestrator_node(state: State) -> dict: 
    
    planner = llm.with_structured_output(Plan)
    
    evidence = state.get("evidence", [])
    mode = state.get("mode","closed_book")
    
    plan = planner.invoke(
      [
        SystemMessage(content = ORCH_SYSTEM),
        HumanMessage(
          content = (
                    f"Topic: {state['topic']}\n"
                    f"Mode: {mode}\n\n"
                    f"Evidence (ONLY use for fresh claims; may be empty):\n"
                    f"{[e.model_dump() for e in evidence][:16]}"
          )
        ),
      ]
    )
    
    return {"plan": plan}


In [22]:
#-----------------------------------------------------------------------------------------------------------#
#---------------------------------FANOUT-CODE FOR AUTO SCALING AND PARALLISM--------------------------------#

def fanout(state: State):
    return [
        Send(
            "worker",
            {
                "task": task.model_dump(),
                "topic": state["topic"],
                "mode": state["mode"],
                "plan": state["plan"].model_dump(),
                "evidence": [e.model_dump() for e in state.get("evidence", [])],
                
            },
        )
        for task in state["plan"].tasks
    ]
