In [1]:
import os
import getpass

os.environ["ANTHROPIC_API_KEY"] = getpass.getpass("Enter your Anthropic API key: ")
os.environ["TAVILY_API_KEY"] = getpass.getpass("Enter your Tavily API key: ")
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API Key: ")
os.environ["COHERE_API_KEY"] = getpass.getpass("Enter your Cohere API key: ")


#### A Note about the PDFs:

I personally got direct permission from Ron Kohavi to use his book, papers, and LinkedIn posts (which I scrapped) as part of this PDF collection. When I share this notebook on Github, I will not include the 'data' subdirectory to keep it private and secure

## State

The state structure organizes all the information needed to generate and refine a report on a given topic through multiple steps. Here’s a breakdown:

- **Report-Level State:**  
  - **Topic:** The overall subject of the report.  
  - **Feedback on Report Plan:** Comments or evaluations on the preliminary report outline.  
  - **Sections:** A list of individual report sections (each with its own name, description, whether research is needed, and content).  
  - **Completed Sections:** A collection of sections that have been fully developed, which is also used by downstream processes (e.g., a Send() API).  
  - **Research-Based Content:** A string that gathers content produced from web research, which may be integrated into the final report.  
  - **Final Report:** The completed report after all sections and revisions are combined.

- **Section-Level State:**  
  For each report section, the state captures:  
  - **Section Details:** Including the section’s name, a brief overview (description), a flag indicating if web research should be performed, and the actual content.  
  - **Search Process:** The number of search iterations that have been executed and a list of search queries used to gather information.  
  - **Source Content:** A formatted string containing the relevant material obtained from current iteration's web searches (for writer)
  - **Source Content All:** A formatted string containing the relevant material obtained from all iterations' web searches (for user display)
  - **Integration:** Both the research-derived content and the list of fully completed sections are tracked to ensure they can be merged into the overall report.

- **Search Queries and Feedback:**  
  - **Search Queries:** Represented as individual query objects, these encapsulate the strings used to perform web searches.  
  - **Feedback:** After generating parts of the report, feedback is provided as a grade (either "pass" or "fail") along with any follow-up queries to improve the work.

In [57]:
from typing import Annotated, List, TypedDict, Literal
from pydantic import BaseModel, Field
import operator

class Section(BaseModel):
    name: str = Field(
        description="Name for this section of the report.",
    )
    description: str = Field(
        description="Brief overview of the main topics and concepts to be covered in this section.",
    )
    research: bool = Field(
        description="Whether to perform web research for this section of the report."
    )
    content: str = Field(
        description="The content of the section."
    )   
    sources: str = Field(
        default="", 
        description="All sources used for this section"
    )

class Sections(BaseModel):
    sections: List[Section] = Field(
        description="Sections of the report.",
    )

class SearchQuery(BaseModel):
    search_query: str = Field(None, description="Query for web search.")

class Queries(BaseModel):
    queries: List[SearchQuery] = Field(
        description="List of search queries.",
    )

class Feedback(BaseModel):
    grade: Literal["pass","fail"] = Field(
        description="Evaluation result indicating whether the response meets requirements ('pass') or needs revision ('fail')."
    )
    follow_up_queries: List[SearchQuery] = Field(
        description="List of follow-up search queries.",
    )

class ReportStateInput(TypedDict):
    topic: str # Report topic
    
class ReportStateOutput(TypedDict):
    final_report: str # Final report

class ReportState(TypedDict):
    topic: str # Report topic    
    feedback_on_report_plan: str # Feedback on the report plan
    sections: list[Section] # List of report sections 
    completed_sections: Annotated[list, operator.add] # Send() API key
    report_sections_from_research: str # String of any completed sections from research to write final sections
    final_report: str # Final report

class SectionState(TypedDict):
    topic: str # Report topic
    section: Section # Report section  
    search_iterations: int # Number of search iterations done
    search_queries: list[SearchQuery] # List of search queries
    source_str: str # String of formatted source content from current iteration web search (for writer)
    source_str_all: str  # All accumulated sources (for user display)
    report_sections_from_research: str # String of any completed sections from research to write final sections
    completed_sections: list[Section] # Final key we duplicate in outer state for Send() API

class SectionOutputState(TypedDict):
    completed_sections: list[Section] # Final key we duplicate in outer state for Send() API
    

## Utilities and Helpers

We have a number of utility functions that we'll use to build our graph. Let's take a look at them.

In [4]:
import os
import asyncio
import requests

from tavily import TavilyClient, AsyncTavilyClient
from langchain_community.retrievers import ArxivRetriever
from typing import List, Optional, Dict, Any
from langsmith import traceable

tavily_client = TavilyClient()
tavily_async_client = AsyncTavilyClient()

This function is a small helper that makes sure your configuration values are in a consistent format. If you pass in a string, it simply returns that string. However, if you pass in an enum (a special type of value), it will extract and return the underlying value of that enum. This helps when your configuration might come in different types but you need to work with strings consistently.

In [5]:
def get_config_value(value):
    """
    Helper function to handle both string and enum cases of configuration values
    """
    return value if isinstance(value, str) else value.value

This function filters a configuration dictionary so that it only includes parameters that are allowed for a specific search API. It works by looking up a list of accepted parameter names for the given API (like “exa” or “pubmed”) and then stripping out any other entries from the configuration. If no configuration is provided, it simply returns an empty dictionary. This ensures that only valid and expected parameters are sent to the API.

In [38]:
# Helper function to get search parameters based on the search API and config
def get_search_params(search_api: str, search_api_config: Optional[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Filters the search_api_config dictionary to include only parameters accepted by the specified search API.

    Args:
        search_api (str): The search API identifier (e.g., "tavily").
        search_api_config (Optional[Dict[str, Any]]): The configuration dictionary for the search API.

    Returns:
        Dict[str, Any]: A dictionary of parameters to pass to the search function.
    """
    # Define accepted parameters for each search API
    SEARCH_API_PARAMS = {
        "rag": [],  # RAG currently accepts no additional parameters
        "arxiv": ["load_max_docs", "get_full_documents", "load_all_available_meta"],
        "tavily": []  # Tavily currently accepts no additional parameters

    }

    # Get the list of accepted parameters for the given search API
    accepted_params = SEARCH_API_PARAMS.get(search_api, [])

    # If no config provided, return an empty dict
    if not search_api_config:
        return {}

    # Filter the config to only include accepted parameters
    return {k: v for k, v in search_api_config.items() if k in accepted_params}

In [37]:
def get_next_search_type(search_iterations):
    if search_iterations == 0:
        return "RAG search (internal A/B testing knowledge base)"
    elif search_iterations == 1:  
        return "ArXiv web search (search academic papers on arXiv)"
    else:
        return "tavily web search (general web sources)"

This function takes a collection of search results—possibly from several responses—and formats them into a neat, human-readable string. It starts by combining all the results and then removes any duplicates (using the title to check for repeats if a RAG source, otherwise using the URL to check for repeats). For each unique source, it prints out the title (and URL if not RAG source). If enabled, it also includes a trimmed version of the full source content, ensuring that the text does not exceed a specified token limit. This results in a clean, consolidated overview of your search results.

In [166]:
def deduplicate_and_format_sources(search_response, max_tokens_per_source, include_raw_content=True, search_iterations=None, return_has_sources=False):
    """
    Takes a list of search responses and formats them into a readable string.
    Limits the raw_content to approximately max_tokens_per_source.
 
    Args:
        search_responses: List of search response dicts, each containing:
            - query: str
            - results: List of dicts with fields:
                - title: str
                - url: str
                - content: str
                - score: float
                - raw_content: str|None
        max_tokens_per_source: int
        include_raw_content: bool
        search_iterations: int, optional
            If 0, deduplicate by title (for RAG results) and show only title
            Otherwise, deduplicate by URL (for web/arxiv results) and show title + URL
        return_has_sources: bool, optional
            If True, returns (formatted_string, has_sources_bool)
            If False, returns just formatted_string 
            
    Returns:
        str OR tuple: 
            - If return_has_sources=False: formatted string
            - If return_has_sources=True: (formatted_string, has_sources_bool)
    """
    # Collect all results
    sources_list = []
    for response in search_response:
        sources_list.extend(response['results'])
    
    if not sources_list:
        empty_result = ""
        return (empty_result, False) if return_has_sources else empty_result
    
    # Deduplicate by title if search_iterations == 0 (RAG), otherwise by URL
    if search_iterations == 0:
        unique_sources = {source['title']: source for source in sources_list}
    else:
        unique_sources = {source['url']: source for source in sources_list}

    # Check if we have unique sources after deduplication
    has_unique_sources = bool(unique_sources)
    
    if not unique_sources:
        empty_result = ""
        return (empty_result, False) if return_has_sources else empty_result

    # Format output
    formatted_text = ""
    for i, source in enumerate(unique_sources.values(), 1):
        formatted_text += f"{source['title']}\n===\n"
        
        # Only show URL if not RAG results (search_iterations != 0)
        if search_iterations != 0:
            formatted_text += f"URL: {source['url']}\n===\n"
        
        if include_raw_content:
            # Using rough estimate of 4 characters per token
            char_limit = max_tokens_per_source * 4
            # Handle None raw_content
            raw_content = source.get('raw_content', '')
            if raw_content is None:
                raw_content = ''
                print(f"Warning: No raw_content found for source {source['url']}")
            if len(raw_content) > char_limit:
                raw_content = raw_content[:char_limit] + "... [truncated]"
                formatted_text += f"Full source content limited to {max_tokens_per_source} tokens \n\n"
                
    final_result = formatted_text.strip()
    return (final_result, has_unique_sources) if return_has_sources else final_result

Designed to help organize documentation or reports, this function takes a list of section objects and turns them into a well-formatted string. For each section, it prints a header with the section number and name, followed by its description, any research notes, and the main content (or a placeholder if the content isn’t written yet). The output is clearly separated by visual dividers, making it easy to read and understand the structure of the document.

In [8]:
def format_sections(sections: list[Section]) -> str:
    """ Format a list of sections into a string """
    formatted_str = ""
    for idx, section in enumerate(sections, 1):
        formatted_str += f"""
{'='*60} # divider line of 60 equal signs
Section {idx}: {section.name}
{'='*60} # divider line of 60 equal signs
Description:
{section.description}
Requires Research: 
{section.research}

Content:
{section.content if section.content else '[Not yet written]'}

"""
    return formatted_str

This function performs multiple web searches concurrently using the Tavily API. You simply provide it with a list of search queries, and it creates asynchronous tasks for each one. The function then gathers all the responses together, each containing details like the title, URL, snippet of content, and optionally the raw content. This is particularly useful when you need to search several queries at once without waiting for each one to finish sequentially.

In [9]:
@traceable
async def tavily_search_async(search_queries):
    """
    Performs concurrent web searches using the Tavily API.

    Args:
        search_queries (List[SearchQuery]): List of search queries to process

    Returns:
            List[dict]: List of search responses from Tavily API, one per query. Each response has format:
                {
                    'query': str, # The original search query
                    'follow_up_questions': None,      
                    'answer': None,
                    'images': list,
                    'results': [                     # List of search results
                        {
                            'title': str,            # Title of the webpage
                            'url': str,              # URL of the result
                            'content': str,          # Summary/snippet of content
                            'score': float,          # Relevance score
                            'raw_content': str|None  # Full page content if available
                        },
                        ...
                    ]
                }
    """
    
    search_tasks = []
    for query in search_queries:
            search_tasks.append(
                tavily_async_client.search(
                    query,
                    max_results=5,
                    include_raw_content=True,
                    topic="general"
                )
            )

    # Execute all searches concurrently
    search_docs = await asyncio.gather(*search_tasks)

    return search_docs

This function is tailored for searching academic papers on arXiv. It runs asynchronously, so it can handle multiple queries without blocking. For each search query, it initializes an arXiv retriever that gathers documents along with their metadata (like authors, publication dates, and summaries). The results are then formatted to include useful details such as a link to the paper and even a link to the PDF if available. It also assigns a relevance score to each paper and respects arXiv’s rate limits by adding delays between requests.

In [30]:
@traceable
async def arxiv_search_async(search_queries, load_max_docs=5, get_full_documents=True, load_all_available_meta=True):
    """
    Performs concurrent searches on arXiv using the ArxivRetriever.

    Args:
        search_queries (List[str]): List of search queries or article IDs
        load_max_docs (int, optional): Maximum number of documents to return per query. Default is 5.
        get_full_documents (bool, optional): Whether to fetch full text of documents. Default is True.
        load_all_available_meta (bool, optional): Whether to load all available metadata. Default is True.

    Returns:
        List[dict]: List of search responses from arXiv, one per query. Each response has format:
            {
                'query': str,                    # The original search query
                'follow_up_questions': None,      
                'answer': None,
                'images': [],
                'results': [                     # List of search results
                    {
                        'title': str,            # Title of the paper
                        'url': str,              # URL (Entry ID) of the paper
                        'content': str,          # Formatted summary with metadata
                        'score': float,          # Relevance score (approximated)
                        'raw_content': str|None  # Full paper content if available
                    },
                    ...
                ]
            }
    """
    
    async def process_single_query(query):
        try:
            # Create retriever for each query
            retriever = ArxivRetriever(
                load_max_docs=load_max_docs,
                get_full_documents=get_full_documents,
                load_all_available_meta=load_all_available_meta
            )
            
            # Run the synchronous retriever in a thread pool
            loop = asyncio.get_event_loop()
            docs = await loop.run_in_executor(None, lambda: retriever.invoke(query))
            
            results = []
            # Assign decreasing scores based on the order
            base_score = 1.0
            score_decrement = 1.0 / (len(docs) + 1) if docs else 0
            
            for i, doc in enumerate(docs):
                # Extract metadata
                metadata = doc.metadata
                
                # Use entry_id as the URL (this is the actual arxiv link)
                url = metadata.get('entry_id', '')
                
                # Format content with all useful metadata
                content_parts = []

                # Primary information
                if 'Summary' in metadata:
                    content_parts.append(f"Summary: {metadata['Summary']}")

                if 'Authors' in metadata:
                    content_parts.append(f"Authors: {metadata['Authors']}")

                # Add publication information
                published = metadata.get('Published')
                published_str = published.isoformat() if hasattr(published, 'isoformat') else str(published) if published else ''
                if published_str:
                    content_parts.append(f"Published: {published_str}")

                # Add additional metadata if available
                if 'primary_category' in metadata:
                    content_parts.append(f"Primary Category: {metadata['primary_category']}")

                if 'categories' in metadata and metadata['categories']:
                    content_parts.append(f"Categories: {', '.join(metadata['categories'])}")

                if 'comment' in metadata and metadata['comment']:
                    content_parts.append(f"Comment: {metadata['comment']}")

                if 'journal_ref' in metadata and metadata['journal_ref']:
                    content_parts.append(f"Journal Reference: {metadata['journal_ref']}")

                if 'doi' in metadata and metadata['doi']:
                    content_parts.append(f"DOI: {metadata['doi']}")

                # Get PDF link if available in the links
                pdf_link = ""
                if 'links' in metadata and metadata['links']:
                    for link in metadata['links']:
                        if 'pdf' in link:
                            pdf_link = link
                            content_parts.append(f"PDF: {pdf_link}")
                            break

                # Join all content parts with newlines 
                content = "\n".join(content_parts)
                
                result = {
                    'title': metadata.get('Title', ''),
                    'url': url,  # Using entry_id as the URL
                    'content': content,
                    'score': base_score - (i * score_decrement),
                    'raw_content': doc.page_content if get_full_documents else None
                }
                results.append(result)
                
            return {
                'query': query,
                'follow_up_questions': None,
                'answer': None,
                'images': [],
                'results': results
            }
        except Exception as e:
            # Handle exceptions gracefully
            print(f"Error processing arXiv query '{query}': {str(e)}")
            return {
                'query': query,
                'follow_up_questions': None,
                'answer': None,
                'images': [],
                'results': [],
                'error': str(e)
            }
    
    # Process queries sequentially with delay to respect arXiv rate limit (1 request per 3 seconds)
    search_docs = []
    for i, query in enumerate(search_queries):
        try:
            # Add delay between requests (3 seconds per ArXiv's rate limit)
            if i > 0:  # Don't delay the first request
                await asyncio.sleep(3.0)
            
            result = await process_single_query(query)
            search_docs.append(result)
        except Exception as e:
            # Handle exceptions gracefully
            print(f"Error processing arXiv query '{query}': {str(e)}")
            search_docs.append({
                'query': query,
                'follow_up_questions': None,
                'answer': None,
                'images': [],
                'results': [],
                'error': str(e)
            })
            
            # Add additional delay if we hit a rate limit error
            if "429" in str(e) or "Too Many Requests" in str(e):
                print("ArXiv rate limit exceeded. Adding additional delay...")
                await asyncio.sleep(5.0)  # Add a longer delay if we hit a rate limit
    
    return search_docs

Now we want to add our RAG tool to be able to access our collection of Ron Kohavi's work. We first load the processed chunks and our fine-tuned embedding model and then set up our retriever

In [10]:
import json

# Path to the saved JSON file
with open("all_chunks_95percentile.json", "r", encoding="utf-8") as f:
    all_chunks_95percentile = json.load(f)


In [11]:
from langchain_core.documents import Document

all_chunks_95percentile = [Document(page_content=chunk['page_content'], 
                      metadata=chunk['metadata']) 
              for chunk in all_chunks_95percentile]

In [12]:
all_chunks_95percentile[342].page_content


'# c. Page load time (performance)\n\nProposed MDE for power calculations: 1% for 250 msec slowdown (70% of 0.6%*250/100 msec) if done on all pages. [Updated 3/2025] Lower MDE of 0.2% if done for a page (e.g., just search).'

In [15]:
from langchain_huggingface import HuggingFaceEmbeddings

embedding_model = HuggingFaceEmbeddings(model_name="kamkol/ab_testing_finetuned_arctic_ft-36dfff22-0696-40d2-b3bf-268fe2ff2aec")

Some weights of BertModel were not initialized from the model checkpoint at kamkol/ab_testing_finetuned_arctic_ft-36dfff22-0696-40d2-b3bf-268fe2ff2aec and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [17]:
from langchain_community.vectorstores import Qdrant

qdrant_vectorstore = Qdrant.from_documents(
    all_chunks_95percentile,
    embedding_model,
    location=":memory:",
    collection_name="kohavi_ab_testing_pdf_collection",
)

  description="Check that the field is empty, alternative syntax for `is_empty: \&quot;field_name\&quot;`",
  description="Check that the field is null, alternative syntax for `is_null: \&quot;field_name\&quot;`",


In [18]:
# We create a retriever that will return the top 5 documents for a given query

qdrant_retriever = qdrant_vectorstore.as_retriever(search_kwargs={"k": 5})

In [20]:
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever

bm25_retriever = BM25Retriever.from_documents(all_chunks_95percentile)
bm25_retriever.k = 5

hybrid_retriever = EnsembleRetriever(
    retrievers=[
        qdrant_retriever, 
        bm25_retriever
    ],
    weights=[0.5, 0.5],
)

In [22]:
from langchain.retrievers.contextual_compression import ContextualCompressionRetriever
from langchain_cohere import CohereRerank

cohere_rerank = CohereRerank(
    model="rerank-english-v3.0",
    top_n=5,
)

reranker = ContextualCompressionRetriever(
    base_compressor=cohere_rerank, base_retriever=hybrid_retriever
)


In [23]:
print(reranker.get_relevant_documents("What is False Positive Risk?"))


  print(reranker.get_relevant_documents("What is False Positive Risk?"))


[Document(metadata={'source': 'Pvalue Misinterpretations Annotated References.pdf', 'section_title': '4. The reproducibility of research and the misinterpretation of p-values (2017) by David Colquhoun.', 'section_level': 1, 'section_id': 'sec_1_1260037657844929439', 'chunk_type': 'text', '_id': '00df71f5197e4fac8bc63d5ab7dae046', '_collection_name': 'kohavi_ab_testing_pdf_collection', 'relevance_score': 0.9979493}, page_content='# 4. The reproducibility of research and the misinterpretation of p-values (2017) by David Colquhoun. Great paper about False Positive Risk. This article has only 256 citations, but I found it extremely insightful about the difference between alpha of 0.05 (what he calls p-less-than) and actually seeing a p-value around 0.05 (what he calls p-equals). In our Intuition Busters paper, we gave the false positive risk of p-less-than, which is much smaller than p-equals. 1. “…if the prior probability of a real effect were only 0.1. And, in this case, if you wanted to

In [31]:
@traceable
async def rag_search_async(search_queries):
    """
    Performs concurrent RAG searches of our thorough A/B testing collection using the reranker.

    Args:
        search_queries (List[SearchQuery]): List of search queries to process

    Returns:
        List[dict]: List of search responses from RAG, one per query. Each response has format:
            {
                'query': str, # The original search query
                'follow_up_questions': None,      
                'answer': None,
                'images': list,
                'results': [                     # List of search results
                    {
                        'title': str,            # Title in format "Kohavi: {title}, Section: {section}"
                        'url': str,              # None for RAG results
                        'content': str,          # None for RAG results
                        'score': float,          # None for RAG results
                        'raw_content': str|None  # Chunk's page_content
                    },
                    ...
                ]
            }
    """
    
    async def single_rag_search(query):
        # Retrieve documents. It's a best practice to return contexts in ascending order
        docs_descending = reranker.get_relevant_documents(query)
        docs = docs_descending[::-1]
        
        # Format each document as a result
        results = []
        for doc in docs:
            source_path = doc.metadata.get("source", "")
            filename = source_path.split("/")[-1] if "/" in source_path else source_path

            # Remove .pdf extension if present
            if filename.endswith('.pdf'):
                filename = filename[:-4]

            section = doc.metadata.get("section_title", "unknown")
            
            title = f"Kohavi: {filename}, Section: {section}"
            
            results.append({
                'title': title,
                'url': None,
                'content': None,
                'score': None,
                'raw_content': doc.page_content
            })
        
        return {
            'query': query,
            'follow_up_questions': None,
            'answer': None,
            'images': [],
            'results': results
        }
    
    # Create tasks for concurrent execution
    search_tasks = [single_rag_search(query) for query in search_queries]
    
    # Execute all searches concurrently
    search_responses = await asyncio.gather(*search_tasks)
    
    return search_responses

In [32]:
result = await rag_search_async(["What is False Positive Risk?"])
print(result)

[{'query': 'What is False Positive Risk?', 'follow_up_questions': None, 'answer': None, 'images': [], 'results': [{'title': 'Kohavi: Online Controlled Experiments at Large Scale, Section: 5.1 False Positives', 'url': None, 'content': None, 'score': None, 'raw_content': '# 5.1 False Positives\n\nFalse positives are “positive findings” that are not actually true. In any large scale system false positives are a commonplace given the many ways in which we violate the assumption of a single hypothesis test done once.'}, {'title': 'Kohavi: Pvalue Misinterpretations Annotated References, Section: Redefine statistical significance (2017) by Benjamin et al.', 'url': None, 'content': None, 'score': None, 'raw_content': 'I suspect every word mattered when reviewed by 72 co-authors. I love the fact that they show two arguments for lowering the alpha threshold: a Bayesian argument, and a False-Positive-Rate/Risk (FPR). 1. “…we believe that a leading cause of non-reproducibility has not yet been ade

## Report Planner and Configurations

This code defines a configuration system for a chatbot that generates reports. Here's a breakdown of its components:

1. **Default Report Structure:**  
   A multi-line string provides a template for creating reports. It suggests starting with an introduction that gives an overview of the topic, then dividing the main content into sections focused on sub-topics, and finally ending with a conclusion that summarizes the main points.

2. **Enumerated Types (Enums):**  
   - **SearchAPI:** Lists possible search services (TAVILY, ARXIV, and RAG) that the chatbot might use to gather information.
   - **PlannerProvider & WriterProvider:** These define options for external service providers that handle planning (organizing the report structure) and writing (generating the text). Options include providers like ANTHROPIC, OPENAI.

3. **Configuration Data Class:**  
   The `Configuration` class holds various settings for the chatbot. Key attributes include:
   - **report_structure:** Uses the default report template.
   - **number_of_queries:** Specifies how many search queries should be generated in each iteration (default is 1).
   - **max_search_depth:** Sets the limit for how many times the chatbot can iterate through reflection and search (default is 3).
   - **planner_provider & planner_model:** Determine which external service and model to use for planning the report.
   - **writer_provider & writer_model:** Specify the external service and model for generating the report text.
   - **search_api & search_api_config:** Indicate which search API to use and allow for additional API configuration.

4. **Configuration Initialization Method:**  
   The class method `from_runnable_config` allows the configuration to be created from a provided configuration object (or from environment variables). It checks for values in the environment (using uppercase names) and from a passed configuration dictionary. Only fields with provided values are used to instantiate the `Configuration` object.

In [167]:
import os
from enum import Enum
from dataclasses import dataclass, fields
from typing import Any, Optional, Dict 

from langchain_core.runnables import RunnableConfig
from dataclasses import dataclass

DEFAULT_REPORT_STRUCTURE = """Use this structure to create a report on the user-provided topic:

1. Introduction (no research needed)
   - Brief overview of the topic area

2. Main Body Sections:
   - Each section should focus on a sub-topic of the user-provided topic
   
3. Conclusion
   - Aim for 1 structural element (either a list of table) that distills the main body sections 
   - Provide a concise summary of the report
   
Provide a paragraph with no more than 500 words to describe the key take aways on the topic"""

# Enum classes in Python create sets of named constants with unique values
class SearchAPI(Enum):
    TAVILY = "tavily"
    ARXIV = "arxiv"
    RAG = "rag"

class PlannerProvider(Enum):
    ANTHROPIC = "anthropic"
    OPENAI = "openai"

class WriterProvider(Enum):
    ANTHROPIC = "anthropic"
    OPENAI = "openai"

# Dataclasses automatically generate boilerplate code for classes that primarily store data
# Dataclasses automatically create __init__, __repr__, __eq__ methods
@dataclass(kw_only=True)
class Configuration:
    """The configurable fields for the chatbot."""
    report_structure: str = DEFAULT_REPORT_STRUCTURE # Defaults to the default report structure

    ### SET THESE NUMBERS HIGHER FOR A LARGER / MORE DETAILED REPORT - YOU MAY RUN INTO RATE LIMITING ISSUES
    number_of_queries: int = 1 # Number of search queries to generate per iteration
    max_search_depth: int = 3 # Maximum number of reflection + search iterations

    ### UNCOMMENT BELOW IF YOU RUN INTO RATE LIMIT ISSUES
    # planner_provider: PlannerProvider = PlannerProvider.OPENAI  # Defaults to OpenAI as provider
    # planner_model: str = "o3-mini" # Defaults to o3-mini, add "-thinking" to enable thinking mode
    # writer_provider: WriterProvider = WriterProvider.OPENAI # Defaults to OpenAI as provider
    #writer_model: str = "o3-mini" # Defaults to o3-mini

    ### COMMENT BELOW IF YOU RUN INTO RATE LIMIT ISSUES
    planner_provider: PlannerProvider = PlannerProvider.ANTHROPIC  # Defaults to Anthropic as provider
    planner_model: str = "claude-opus-4-20250514" # Defaults to claude-opus-4-20250514
    writer_provider: WriterProvider = WriterProvider.ANTHROPIC # Defaults to Anthropic as provider
    writer_model: str = "claude-sonnet-4-20250514" # Defaults to claude-sonnet-4-20250514

    
    search_api: SearchAPI = SearchAPI.TAVILY # Default to TAVILY
    search_api_config: Optional[Dict[str, Any]] = None 

    @classmethod
    def from_runnable_config(
        cls, config: Optional[RunnableConfig] = None
    ) -> "Configuration":
        """Create a Configuration instance from a RunnableConfig."""
        configurable = (
            config["configurable"] if config and "configurable" in config else {}
        )
        values: dict[str, Any] = {
            f.name: os.environ.get(f.name.upper(), configurable.get(f.name))
            for f in fields(cls)
            if f.init
        }
        return cls(**{k: v for k, v in values.items() if v})

## Prompt Templates

These templates are used to guide the chatbot's behavior in different stages of report generation: 

1. **Report Planner Query Writer:**  
   This prompt generates search queries to help with planning the report structure. It asks the model to create queries that will gather information for each section of the report.

2. **Report Plan:**  
   This prompt generates a plan for the report. It asks the model to create a list of sections for the report.

3. **Section Writer:**  
   This prompt generates a section of the report. It asks the model to write a section of the report based on the provided section topic and existing content.

4. **Section Grader:**  
   This prompt grades a section of the report. It asks the model to evaluate whether the section content adequately addresses the section topic.

5. **Final Section Writer:**  
   This prompt generates the final section of the report. It asks the model to write a section of the report that synthesizes information from the rest of the report.

In [168]:
# Prompt to generate search queries to help with planning the report
report_planner_query_writer_instructions="""You are performing research for a report. 

<Report topic>
{topic}
</Report topic>

<Report organization>
{report_organization}
</Report organization>

<Task>
Your goal is to generate {number_of_queries} web search queries that will help gather information for planning the report sections. 

The queries should:

1. Be related to the Report topic
2. Help satisfy the requirements specified in the report organization

Make the queries specific enough to find high-quality, relevant sources while covering the breadth needed for the report structure.
</Task>
"""

# Prompt to generate the report plan
report_planner_instructions="""I want a plan for a report that is concise and focused.

<Report topic>
The topic of the report is:
{topic}
</Report topic>

<Report organization>
The report should follow this organization: 
{report_organization}
</Report organization>

<Context>
Here is context to use to plan the sections of the report: 
{context}
</Context>

<Task>
Generate a list of sections for the report. Your plan should be tight and focused with NO overlapping sections or unnecessary filler. 

For example, a good report structure might look like:
1/ intro
2/ overview of topic A
3/ overview of topic B
4/ comparison between A and B
5/ conclusion

Each section should have the fields:

- Name - Name for this section of the report.
- Description - Brief overview of the main topics covered in this section.
- Research - Whether to perform web research for this section of the report.
- Content - The content of the section, which you will leave blank for now.

Integration guidelines:
- Include examples and implementation details within main topic sections, not as separate sections
- Ensure each section has a distinct purpose with no content overlap
- Combine related concepts rather than separating them

Before submitting, review your structure to ensure it has no redundant sections and follows a logical flow.
</Task>

<Feedback>
Here is feedback on the report structure from review (if any):
{feedback}
</Feedback>
"""

# Query writer instructions
query_writer_instructions="""You are an expert technical writer crafting targeted web search queries that will gather comprehensive information for writing a technical report section.

<Report topic>
{topic}
</Report topic>

<Section topic>
{section_topic}
</Section topic>

<Task>
Your goal is to generate {number_of_queries} search queries that will help gather comprehensive information above the section topic. 

The queries should:

1. Be related to the topic 
2. Examine different aspects of the topic

Make the queries specific enough to find high-quality, relevant sources.
</Task>
"""

# Section writer instructions
section_writer_instructions = """You are an expert technical writer crafting one section of a technical report.

<Report topic>
{topic}
</Report topic>

<Section name>
{section_name}
</Section name>

<Section topic>
{section_topic}
</Section topic>

<Existing section content (if populated)>
{section_content}
</Existing section content>

<Source material>
{context}
</Source material>


<Guidelines for writing>
1. If the existing section content is not populated, write a new section from scratch.
2. If the existing section content is populated, write a new section that synthesizes the existing section content with the Source material. If there is a discrepancy between the existing section content and the Source material, use the existing section content as the primary source. The purpose of the Source material is to provide additional information and context to help fill the gaps in the existing section content.
</Guidelines for writing>

<Length and style>
- Strict 150-200 word limit
- No marketing language
- Technical focus
- Write in simple, clear language
- Start with your most important insight in **bold**
- Use short paragraphs (2-3 sentences max)
- Use ## for section title (Markdown format)
- Only use ONE structural element IF it helps clarify your point:
  * Either a focused table comparing 2-3 key items (using Markdown table syntax)
  * Or a short list (3-5 items) using proper Markdown list syntax:
    - Use `*` or `-` for unordered lists
    - Use `1.` for ordered lists
    - Ensure proper indentation and spacing
</Length and style>

<Quality checks>
- Exactly 150-200 words (excluding title and sources)
- Careful use of only ONE structural element (table or list) and only if it helps clarify your point
- One specific example / case study
- Starts with bold insight
- No preamble prior to creating the section content
- If there is a discrepancy between the existing section content and the Source material, use the existing section content as the primary source. The purpose of the Source material is to provide additional information and context to help fill the gaps in the existing section content.
</Quality checks>
"""

# Instructions for section grading
section_grader_instructions = """Review a report section relative to the specified topic:

<Report topic>
{topic}
</Report topic>

<section topic>
{section_topic}
</section topic>

<section content>
{section}
</section content>

<search type>
{current_iteration}
</search type>

<task>
Evaluate whether the section content adequately addresses the section topic.

If the section content does not adequately address the section topic, generate {number_of_follow_up_queries} follow-up search queries to gather missing information. Note that if search type is 1, your follow-up search queries will be used to search Arxiv for academic papers. If search type is 2 or more, your follow-up search queries will be used to search Tavily for general web search.
</task>

<format>
    grade: Literal["pass","fail"] = Field(
        description="Evaluation result indicating whether the response meets requirements ('pass') or needs revision ('fail')."
    )
    follow_up_queries: List[SearchQuery] = Field(
        description="List of follow-up search queries.",
    )
</format>
"""

final_section_writer_instructions="""You are an expert technical writer crafting a section that synthesizes information from the rest of the report.

<Report topic>
{topic}
</Report topic>

<Section name>
{section_name}
</Section name>

<Section topic> 
{section_topic}
</Section topic>

<Available report content>
{context}
</Available report content>

<Task>
1. Section-Specific Approach:

For Introduction:
- Use # for report title (Markdown format)
- 50-100 word limit
- Write in simple and clear language
- Focus on the core motivation for the report in 1-2 paragraphs
- Use a clear narrative arc to introduce the report
- Include NO structural elements (no lists or tables)
- No sources section needed

For Conclusion/Summary:
- Use ## for section title (Markdown format)
- 100-150 word limit
- For comparative reports:
    * Must include a focused comparison table using Markdown table syntax
    * Table should distill insights from the report
    * Keep table entries clear and concise
- For non-comparative reports: 
    * Only use ONE structural element IF it helps distill the points made in the report:
    * Either a focused table comparing items present in the report (using Markdown table syntax)
    * Or a short list using proper Markdown list syntax:
      - Use `*` or `-` for unordered lists
      - Use `1.` for ordered lists
      - Ensure proper indentation and spacing
- End with specific next steps or implications
- No sources section needed

3. Writing Approach:
- Use concrete details over general statements
- Make every word count
- Focus on your single most important point
</Task>

<Quality Checks>
- For introduction: 50-100 word limit, # for report title, no structural elements, no sources section
- For conclusion: 100-150 word limit, ## for section title, only ONE structural element at most, no sources section
- Markdown format
- Do not include word count or any preamble in your response
</Quality Checks>"""

## Nodes for Our Graph

## 1. `generate_report_plan`

**Purpose:**  
Creates the initial report plan by breaking down the topic into sections and generating search queries to guide further research.

**Key Steps:**
- **Input Extraction:** Retrieves the topic and any feedback provided.
- **Configuration Loading:** Loads settings (e.g., report structure, number of queries, search API details).
- **Query Generation:** Uses a writer model to generate search queries based on the topic and desired report organization.
- **Web Search:** Executes a web search (via APIs like *tavily* or *arxiv*) with the generated queries to retrieve relevant sources.
- **Section Planning:** Uses a planner model to create detailed sections (each with a name, description, plan, research flag, and content field) based on the gathered sources.
- **Output:** Returns a dictionary with a key `"sections"` containing a list of planned sections.

In [169]:
from typing import Literal

from langchain_core.messages import HumanMessage, SystemMessage
from langchain.chat_models import init_chat_model
from langchain_core.runnables import RunnableConfig

from langgraph.constants import Send
from langgraph.graph import START, END, StateGraph
from langgraph.types import interrupt, Command

# Nodes
async def generate_report_plan(state: ReportState, config: RunnableConfig):
    """ Generate the report plan """

    # Inputs
    topic = state["topic"]
    feedback = state.get("feedback_on_report_plan", None)

    # Get configuration
    configurable = Configuration.from_runnable_config(config)
    report_structure = configurable.report_structure
    number_of_queries = configurable.number_of_queries
    # We want to use tavily as the search API for generating the report plan
    search_api = "tavily"
    

    # Convert JSON object to string if necessary
    if isinstance(report_structure, dict):
        report_structure = str(report_structure)

    # Set writer model (model used for query writing and section writing)
    writer_provider = get_config_value(configurable.writer_provider)
    writer_model_name = get_config_value(configurable.writer_model)
    writer_model = init_chat_model(model=writer_model_name, model_provider=writer_provider) 

    # Forces the model to generate valid JSON matching the Queries schema, which 
    # makes it easier to process the results systemically
    structured_llm = writer_model.with_structured_output(Queries)

    # Format system instructions
    system_instructions_query = report_planner_query_writer_instructions.format(topic=topic, report_organization=report_structure, number_of_queries=number_of_queries)

    # Generate queries  
    results = structured_llm.invoke([SystemMessage(content=system_instructions_query),
                                     HumanMessage(content="Generate search queries that will help with planning the sections of the report.")])

    # Web search
    query_list = [query.search_query for query in results.queries]

    search_api_config = configurable.search_api_config or {}
    params_to_pass = get_search_params(search_api, search_api_config)

    # Search the web with parameters
    if search_api == "tavily":
        search_results = await tavily_search_async(query_list, **params_to_pass)
        source_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, include_raw_content=False)
    elif search_api == "arxiv":
        search_results = await arxiv_search_async(query_list, **params_to_pass)
        source_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, include_raw_content=False)
    else:
        raise ValueError(f"Unsupported search API: {search_api}")

    # Format system instructions
    system_instructions_sections = report_planner_instructions.format(topic=topic, report_organization=report_structure, context=source_str, feedback=feedback)

    # Set the planner
    planner_provider = get_config_value(configurable.planner_provider)
    planner_model = get_config_value(configurable.planner_model)

    # Report planner instructions
    planner_message = """Generate the sections of the report. Your response must include a 'sections' field containing a list of sections. 
                        Each section must have: name, description, plan, research, and content fields."""

    # Run the planner

    planner_llm = init_chat_model(
    model=planner_model,  
    model_provider=planner_provider,
    max_tokens=32_000,  
    thinking={"type": "enabled", "budget_tokens": 24_000}  
    )

    # Forces the model to generate valid JSON matching the Sections schema, which 
    # makes it easier to process the results systemically
    structured_llm = planner_llm.with_structured_output(Sections)
    report_sections = structured_llm.invoke([SystemMessage(content=system_instructions_sections),
                                                 HumanMessage(content=planner_message)])

    # Get sections
    sections = report_sections.sections

    return {"sections": sections}

## 2. `human_feedback`

**Purpose:**  
Obtains user feedback on the generated report plan and decides whether to approve the plan or refine it.

**Key Steps:**
- **Plan Formatting:** Converts the planned sections into a readable string format.
- **Feedback Collection:** Uses an interrupt (user prompt) to gather approval or suggestions.
- **Decision Making:**  
  - If approved (feedback is `True`), triggers the building of report sections that require further research.  
  - If not, updates the report plan with the provided feedback and regenerates the plan.
- **Output:** Returns a command directing the next step—either to build sections or to re-run the planning node

In [170]:
def human_feedback(state: ReportState, config: RunnableConfig) -> Command[Literal["generate_report_plan","build_section_with_web_research"]]:
    """ Get feedback on the report plan """

    # Get sections
    topic = state["topic"]
    sections = state['sections']
    sections_str = "\n\n".join(
        f"Section: {section.name}\n"
        f"Description: {section.description}\n"
        f"Research needed: {'Yes' if section.research else 'No'}\n"
        for section in sections
    )

    # Get feedback on the report plan from interrupt
    interrupt_message = f"""Please provide feedback on the following report plan. 
                        \n\n{sections_str}\n\n
                        \nDoes the report plan meet your needs? Pass 'true' without the single quotes to approve the report plan or provide feedback to regenerate the report plan:"""
    
    feedback = interrupt(interrupt_message)

    # If the user approves the report plan, kick off section writing
    if isinstance(feedback, bool) and feedback is True:
        # Treat this as approve and kick off section writing
        # Command(goto=[]) is powerful because it dynamically creates multiple parallel 
        # execution branches - one for each section that requires research
        return Command(goto=[
            Send("build_section_with_web_research", {"topic": topic, "section": s, "search_iterations": 0}) 
            for s in sections 
            if s.research
        ])
    
    # If the user provides feedback, regenerate the report plan 
    elif isinstance(feedback, str):
        # Treat this as feedback
        return Command(goto="generate_report_plan", 
                       update={"feedback_on_report_plan": feedback})
    else:
        raise TypeError(f"Interrupt value of type {type(feedback)} is not supported.")


## 3. `generate_queries`

**Purpose:**  
Generates targeted search queries for a specific report section to drive focused web research.

**Key Steps:**
- **Input Extraction:** Uses the topic and section description.
- **Query Generation:** Utilizes a writer model to generate a set number of search queries tailored to the section.
- **Output:** Returns a dictionary with `"search_queries"` containing the list of generated queries.

In [171]:
def generate_queries(state: SectionState, config: RunnableConfig):
    """ Generate search queries for a report section to query our A/B testing RAG collection """

    # Get state 
    topic = state["topic"]
    section = state["section"]

    # Get configuration
    configurable = Configuration.from_runnable_config(config)
    number_of_queries = configurable.number_of_queries

    # Generate queries 
    writer_provider = get_config_value(configurable.writer_provider)
    writer_model_name = get_config_value(configurable.writer_model)
    writer_model = init_chat_model(model=writer_model_name, model_provider=writer_provider) 
    structured_llm = writer_model.with_structured_output(Queries)

    # Format system instructions
    system_instructions = query_writer_instructions.format(topic=topic, 
                                                           section_topic=section.description, 
                                                           number_of_queries=number_of_queries)

    # Generate queries  
    queries = structured_llm.invoke([SystemMessage(content=system_instructions),
                                     HumanMessage(content="Generate search queries on the provided topic.")])

    return {"search_queries": queries.queries}

## 4. `search_rag_and_web`

**Purpose:**  
Performs RAG and web searches using the generated queries to gather raw sources and relevant information for a report section.

**Key Steps:**
- **Query List Preparation:** Converts the generated queries into a list.
- **API Selection & Execution:** Calls the configured search API (e.g., *tavily*, etc.) with the appropriate parameters. Note that 1st search iteration we use our Kohavi RAG collection, 2nd search iteratin we use arxiv, and 3rd and final search iteration we use tavily (general web search).
- **Result Processing:** Deduplicates and formats the search results into a single string.
- **Iteration Update:** Increments the search iteration count for tracking.
- **Output:** Returns a dictionary with:
  - `"source_str"`: The formatted search result string.
  - `"source_str_all"`: All sources for displaying to the user
  - `"search_iterations"`: The updated count.

In [172]:
async def search_rag_and_web(state: SectionState, config: RunnableConfig):
    """ Search A/B testing RAG collection and web with dual source tracking """

    # Get state 
    search_queries = state["search_queries"]
    search_iterations = state["search_iterations"]
    existing_source_str_all = state.get("source_str_all", "")  # All previous sources

    # Get configuration and choose search API based on iteration
    configurable = Configuration.from_runnable_config(config)
    
    if search_iterations == 0:
        search_api = "rag"
    elif search_iterations == 1:
        search_api = "arxiv"
    else:
        search_api = "tavily"

    # Execute search 
    query_list = [query.search_query for query in search_queries]
    search_api_config = configurable.search_api_config or {}
    params_to_pass = get_search_params(search_api, search_api_config)

    if search_api == "rag":
        search_results = await rag_search_async(query_list)
    elif search_api == "arxiv":
        search_results = await arxiv_search_async(query_list, **params_to_pass)
    elif search_api == "tavily":
        search_results = await tavily_search_async(query_list)
    else:
        raise ValueError(f"Unsupported search API: {search_api}")

    # Format current iteration sources and check if there are any
    # Use return_has_sources=True to get both the formatted string and the boolean
    current_source_str, has_sources = deduplicate_and_format_sources(
        search_results, 
        max_tokens_per_source=1000, 
        include_raw_content=True, 
        search_iterations=search_iterations,
        return_has_sources=True
    )

    # Only add iteration header and sources if there are actually sources to display
    if has_sources:
        iteration_header = f"{'='*80}\nSEARCH ITERATION {search_iterations + 1} - {search_api.upper()} RESULTS\n{'='*80}\n\n"
        
        # Accumulate all sources for user display
        if existing_source_str_all:
            accumulated_source_str = existing_source_str_all + "\n\n" + iteration_header + current_source_str
        else:
            accumulated_source_str = iteration_header + current_source_str
    else:
        # No sources found, don't add header, keep existing sources
        accumulated_source_str = existing_source_str_all
        current_source_str = ""  # No sources for writer

    return {
        "source_str": current_source_str,  # Only current iteration for writer
        "source_str_all": accumulated_source_str,  # All sources for user display
        "search_iterations": search_iterations + 1
    }

## 5. `write_section`

**Purpose:**  
Writes the content of a report section by synthesizing the gathered RAG/web research.

**Key Steps:**
- **Content Generation:** Uses a writer model to create section content based on the topic, section details, and the context provided by the search results.
- **Content Validation:**  
  - Employs a planner model to grade the generated section.  
  - Determines if additional research is needed (by checking the grade or if the maximum search iterations are reached).
- **Control Flow:**  
  - If the section passes or maximum iterations are reached, it publishes the section to completed sections.  
  - Otherwise, it updates the section with follow-up queries (specific to the search tool used for that iteration round) and loops back to `search_rag_and_web`.
- **Output:** Returns a command that either ends the search for this section or directs the next web search iteration.


In [173]:
def write_section(state: SectionState, config: RunnableConfig) -> Command[Literal[END, "search_rag_and_web"]]:
    """ Write a section of the report """

    # Get state 
    topic = state["topic"]
    section = state["section"]
    source_str = state["source_str"]
    search_iterations = state["search_iterations"]  

    # Get configuration
    configurable = Configuration.from_runnable_config(config)

    # Get configuration
    configurable = Configuration.from_runnable_config(config)

    # Format system instructions
    system_instructions = section_writer_instructions.format(topic=topic, 
                                                             section_name=section.name, 
                                                             section_topic=section.description, 
                                                             context=source_str, 
                                                             section_content=section.content)
    
    # Generate section  
    writer_provider = get_config_value(configurable.writer_provider)
    writer_model_name = get_config_value(configurable.writer_model)
    writer_model = init_chat_model(model=writer_model_name, model_provider=writer_provider) 
    section_content = writer_model.invoke([SystemMessage(content=system_instructions),
                                           HumanMessage(content="Generate a report section based on the existing section content (if any) and the provided sources.")])
    
    # Write content to the section object  
    section.content = section_content.content

    # Grade prompt 
    section_grader_message = """Grade the report and consider follow-up questions for missing information.
                               If the grade is 'pass', return empty strings for all follow-up queries.
                               If the grade is 'fail', provide specific search queries to gather missing information."""
    
    section_grader_instructions_formatted = section_grader_instructions.format(topic=topic, 
                                                                               section_topic=section.description,
                                                                               section=section.content, 
                                                                               number_of_follow_up_queries=configurable.number_of_queries,
                                                                               current_iteration=search_iterations)
    
    # Use planner model for reflection
    planner_provider = get_config_value(configurable.planner_provider)
    planner_model = get_config_value(configurable.planner_model)

    reflection_llm = init_chat_model(
    model=planner_model,  
    model_provider=planner_provider,
    max_tokens=32_000,  
    thinking={"type": "enabled", "budget_tokens": 24_000}  
    )

    reflection_model = reflection_llm.with_structured_output(Feedback)
    feedback = reflection_model.invoke([SystemMessage(content=section_grader_instructions_formatted),
                                            HumanMessage(content=section_grader_message)])
    
    # If the section is passing or max depth reached
    if feedback.grade == "pass" or state["search_iterations"] >= configurable.max_search_depth:
        # Store sources in the section object 
        section.sources = state.get("source_str_all", "") 

        return Command(
            update={
                "completed_sections": [section]
            },
            goto=END
        )
    else:
        return Command(
            update={"search_queries": feedback.follow_up_queries, "section": section},
            goto="search_rag_and_web"
        )


## 6. `write_final_sections`

**Purpose:**  
Generates the final version of sections that do not require further research by using the compiled context from completed sections.

**Key Steps:**
- **Context Preparation:** Receives the topic, section details, and the aggregated completed sections.
- **Final Writing:** Uses a writer model to produce the final version of the section content.
- **Output:** Returns a dictionary with `"completed_sections"` updated with the final section content.

In [174]:
def write_final_sections(state: SectionState, config: RunnableConfig):
    """ Write final sections of the report, which do not require RAG or web search and use the completed sections as context """

    # Get configuration
    configurable = Configuration.from_runnable_config(config)

    # Get state 
    topic = state["topic"]
    section = state["section"]
    completed_report_sections = state["report_sections_from_research"]
    
    # Format system instructions
    system_instructions = final_section_writer_instructions.format(topic=topic, section_name=section.name, section_topic=section.description, context=completed_report_sections)

    # Generate section  
    writer_provider = get_config_value(configurable.writer_provider)
    writer_model_name = get_config_value(configurable.writer_model)
    writer_model = init_chat_model(model=writer_model_name, model_provider=writer_provider) 
    section_content = writer_model.invoke([SystemMessage(content=system_instructions),
                                           HumanMessage(content="Generate a report section based on the provided sources.")])
    
    # Write content to section 
    section.content = section_content.content

    # Write the updated section to completed sections
    return {"completed_sections": [section]}

## 7. `gather_completed_sections`

**Purpose:**  
Consolidates all completed sections (in the right order!) into a formatted context string to support final section writing.

**Key Steps:**
- **Aggregation:** Collates completed sections from earlier research in the right order.
- **Formatting:** Converts each section into a string format to be used as context. We make sure to remove sources since we don't need them for writing the final sections (would just clog up the context window).
- **Output:** Returns a dictionary with `"report_sections_from_research"` containing the aggregated context.

In [175]:
def gather_completed_sections(state: ReportState):
    """ Gather completed sections from research and format them as context for writing the final sections """    

    # Get original section order and completed sections
    original_sections = state["sections"]
    completed_sections = state["completed_sections"]
    
    # Create mapping of completed sections by name
    completed_by_name = {s.name: s for s in completed_sections}
    
    # Sort completed sections by original report order
    ordered_completed_sections = []
    for original_section in original_sections:
        if original_section.name in completed_by_name:
            ordered_completed_sections.append(completed_by_name[original_section.name])
    
    # Create sections without sources in correct order
    sections_without_sources = []
    for section in ordered_completed_sections:
        temp_section = Section(
            name=section.name,
            description=section.description,
            research=section.research,
            content=section.content,
            sources=""
        )
        sections_without_sources.append(temp_section)

    # Format in original report order
    completed_report_sections = format_sections(sections_without_sources)

    return {"report_sections_from_research": completed_report_sections}

## 8. `initiate_final_section_writing`

**Purpose:**  
Triggers the final writing phase for report sections that do not need further web research.

**Key Steps:**
- **Selection:** Identifies sections marked as not requiring additional research.
- **Parallel Processing:** Uses a parallelized `Send()` API to launch final section writing tasks concurrently.
- **Output:** Returns a list of `Send` commands for writing final sections.

In [176]:
def initiate_final_section_writing(state: ReportState):
    """ Write any final sections using the Send API to parallelize the process """    

    # Kick off section writing in parallel via Send() API for any sections that do not require research
    return [
        Send("write_final_sections", {"topic": state["topic"], "section": s, "report_sections_from_research": state["report_sections_from_research"]}) 
        for s in state["sections"] 
        if not s.research # only sections that do not require research (e.g. intro and conclusion)
    ]

## 9. `compile_final_report`

**Purpose:**  
Compiles all finalized sections into one cohesive final report.

**Key Steps:**
- **Content Mapping:** Matches the finalized content with the original sections while preserving the intended order.
- **Report Assembly:** Joins all section contents together into a single text string.
- **Sources Assembly:** Handles proper output of all the sources to be displayed to the user.
- **Output:** Returns a dictionary with the key `"final_report"` containing the complete report.

In [177]:
def compile_final_report(state: ReportState):
    """ Compile the final report with section-grouped sources only for research sections """    

    # Get sections and sources
    sections = state["sections"]
    completed_sections = {s.name: s.content for s in state["completed_sections"]}

    # Update sections with completed content while maintaining original order
    for section in sections:
        section.content = completed_sections[section.name]

    # Compile main report
    main_report = "\n\n".join([s.content for s in sections])
    
    # Add sources section with organization by research sections only
    research_sections_with_sources = [s for s in state["completed_sections"] if s.research and s.sources]
    
    if research_sections_with_sources:
        sources_section = "\n\n## Sources Used\n\n"
        
        # Iterate through sections in original order and add sources if they exist
        for section in sections:
            if section.research:
                # Find the completed section with sources
                completed_section = next((s for s in state["completed_sections"] if s.name == section.name), None)
                if completed_section and completed_section.sources:
                    sources_section += f"### Sources for Section: {section.name}\n\n"
                    sources_section += completed_section.sources + "\n\n"
        
        final_report_with_sources = main_report + sources_section
    else:
        final_report_with_sources = main_report

    return {"final_report": final_report_with_sources}

## BUILD THAT GRAPH!!!!

In [178]:
section_builder = StateGraph(SectionState, output=SectionOutputState)
section_builder.add_node("generate_queries", generate_queries)
section_builder.add_node("search_rag_and_web", search_rag_and_web)
section_builder.add_node("write_section", write_section)

# Add edges
section_builder.add_edge(START, "generate_queries")
section_builder.add_edge("generate_queries", "search_rag_and_web")
section_builder.add_edge("search_rag_and_web", "write_section")

# Outer graph -- 

# Add nodes
builder = StateGraph(ReportState, input=ReportStateInput, output=ReportStateOutput, config_schema=Configuration)
builder.add_node("generate_report_plan", generate_report_plan)
builder.add_node("human_feedback", human_feedback)
builder.add_node("build_section_with_web_research", section_builder.compile())
builder.add_node("gather_completed_sections", gather_completed_sections)
builder.add_node("write_final_sections", write_final_sections)
builder.add_node("compile_final_report", compile_final_report)

# Add edges
builder.add_edge(START, "generate_report_plan")
builder.add_edge("generate_report_plan", "human_feedback")
builder.add_edge("build_section_with_web_research", "gather_completed_sections")
builder.add_conditional_edges("gather_completed_sections", initiate_final_section_writing, ["write_final_sections"])
builder.add_edge("write_final_sections", "compile_final_report")
builder.add_edge("compile_final_report", END)

<langgraph.graph.state.StateGraph at 0x175bdfce0>

## Using the Graph: WITH CHECKPOINTS!

In [179]:
from langgraph.checkpoint.memory import MemorySaver
from IPython.display import Markdown, display

# Without checkpointing, any interactive elements (e.g. user feedback) would require 
# restarting the entire graph from scratch.
# Create a memory saver for checkpointing
memory = MemorySaver()

# Compile the graph with the checkpointer
graph_with_checkpoint = builder.compile(checkpointer=memory)

In [180]:
# Create a unique thread ID to identify the specific graph execution for checkpointing
import uuid
thread_id = str(uuid.uuid4())

# Start the graph execution with the topic and display the final report when it appears
async def run_graph_and_show_report():
    """Run the graph and display the final report when it appears"""
    async for chunk in graph_with_checkpoint.astream(
        {"topic": "Group Sequential Testing"}, 
        {"configurable": {"thread_id": thread_id}},
        stream_mode="updates"
    ):
        print(chunk)
        print("\n")
        
        # Check if this chunk contains the final_report
        if isinstance(chunk, dict) and 'final_report' in chunk:
            print("🎉 Final report generated! 🎉")
            display(Markdown(f"# Group Sequential Testing Report\n\n{chunk['final_report']}"))
            return
        
        # Check if this is an interrupt that needs user feedback
        if isinstance(chunk, dict) and '__interrupt__' in chunk:
            interrupt_value = chunk['__interrupt__'][0].value
            display(Markdown(f"**Feedback Request:**\n{interrupt_value}"))
            return  # Stop execution to allow user to provide feedback

# Run the graph
await run_graph_and_show_report()



{'generate_report_plan': {'sections': [Section(name='Introduction to Group Sequential Testing', description='Overview of group sequential testing methodology, its purpose in clinical trials, and advantages over traditional fixed-sample designs. Covers the fundamental concept of interim analyses and early stopping decisions.', research=False, content='', sources=''), Section(name='Statistical Framework and Stopping Boundaries', description="Core statistical principles underlying group sequential designs, including alpha spending functions, types of stopping boundaries (Pocock, O'Brien-Fleming, Wang-Tsiatis), and methods for controlling Type I error rates across multiple analyses.", research=True, content='', sources=''), Section(name='Types of Group Sequential Designs and Applications', description='Comprehensive overview of different group sequential design approaches including classical designs, adaptive designs, and hybrid approaches. Covers selection criteria, advantages and limitat

**Feedback Request:**
Please provide feedback on the following report plan. 
                        

Section: Introduction to Group Sequential Testing
Description: Overview of group sequential testing methodology, its purpose in clinical trials, and advantages over traditional fixed-sample designs. Covers the fundamental concept of interim analyses and early stopping decisions.
Research needed: No


Section: Statistical Framework and Stopping Boundaries
Description: Core statistical principles underlying group sequential designs, including alpha spending functions, types of stopping boundaries (Pocock, O'Brien-Fleming, Wang-Tsiatis), and methods for controlling Type I error rates across multiple analyses.
Research needed: Yes


Section: Types of Group Sequential Designs and Applications
Description: Comprehensive overview of different group sequential design approaches including classical designs, adaptive designs, and hybrid approaches. Covers selection criteria, advantages and limitations of each type, with practical examples from clinical trials.
Research needed: Yes


Section: Implementation Considerations and Regulatory Guidance
Description: Practical aspects of implementing group sequential designs in clinical trials, including sample size calculations, software tools, data monitoring committee operations, and regulatory perspectives from FDA and other agencies. Includes best practices for design specification and execution.
Research needed: Yes


Section: Conclusion and Key Takeaways
Description: Summary of group sequential testing benefits and challenges, featuring a comparison table of major design types. Includes a comprehensive paragraph (under 500 words) synthesizing the key takeaways and future directions in group sequential methodology.
Research needed: No



                        
Does the report plan meet your needs? Pass 'true' without the single quotes to approve the report plan or provide feedback to regenerate the report plan:

In [181]:
async def approve_plan():
    """Approve the plan and continue execution"""
    async for chunk in graph_with_checkpoint.astream(
        Command(resume=True), 
        {"configurable": {"thread_id": thread_id}},
        stream_mode="updates"
    ):
        print(chunk)
        print("\n")
        
        # Check if this chunk contains the compile_final_report with final_report
        if isinstance(chunk, dict) and 'compile_final_report' in chunk:
            if 'final_report' in chunk['compile_final_report']:
                print("🎉 Final report generated! 🎉")
                final_report = chunk['compile_final_report']['final_report']
                display(Markdown(f"# Group Sequential Testing Report\n\n{final_report}"))
                return

In [182]:
async def provide_feedback(feedback_text):
    """Provide feedback and continue execution"""
    async for chunk in graph_with_checkpoint.astream(
        Command(resume=feedback_text), 
        {"configurable": {"thread_id": thread_id}},
        stream_mode="updates"
    ):
        print(chunk)
        print("\n")
        
        # Check if this chunk contains the final_report
        if isinstance(chunk, dict) and 'final_report' in chunk:
            print("🎉 Final report generated! 🎉")
            display(Markdown(f"# Group Sequential Testing Report\n\n{chunk['final_report']}"))
            return

> NOTE: You *can* choose to continue the flow - though the notebook implementation will require you to stretch your coding muscles a bit!

In [183]:
await approve_plan()

{'human_feedback': None}


























{'write_final_sections': {'completed_sections': [Section(name='Conclusion and Key Takeaways', description='Summary of group sequential testing benefits and challenges, featuring a comparison table of major design types. Includes a comprehensive paragraph (under 500 words) synthesizing the key takeaways and future directions in group sequential methodology.', research=False, content="## Conclusion and Key Takeaways\n\nGroup sequential testing offers powerful early stopping capabilities while maintaining statistical rigor, but implementation complexity varies significantly across design types. The choice between boundary approaches involves fundamental trade-offs between early detection sensitivity and sample size requirements.\n\n| Design Type | Primary Advantage | Key Limitation | Best Use Case |\n|-------------|------------------|----------------|---------------|\n| O'Brien-Fleming | Preserves power, conservative early stopping | Requires strong early evidence | Confirmatory trial

# Group Sequential Testing Report

# Group Sequential Testing

Group sequential testing revolutionizes clinical trial efficiency by enabling interim analyses with pre-planned stopping rules, offering a compelling alternative to traditional fixed-sample designs. This methodology allows trials to terminate early for efficacy, futility, or safety concerns while maintaining statistical validity through carefully constructed boundaries. By incorporating multiple planned analyses throughout the study duration, investigators can make informed decisions about continuing, modifying, or stopping trials based on accumulating evidence, potentially saving time, resources, and exposing fewer participants to ineffective treatments.

## Summary

Group sequential testing has matured from simple boundary functions to sophisticated frameworks accommodating complex trial architectures. The methodology balances early stopping opportunities against Type I error control through alpha spending functions and information-based boundaries. Modern applications extend beyond traditional efficacy testing to encompass adaptive designs, platform trials, and hybrid approaches that maintain regulatory acceptance while providing operational flexibility.

| Design Element | Traditional Approach | Group Sequential Advantage |
|---------------|---------------------|---------------------------|
| Sample size | Fixed at design | Potentially reduced through early stopping |
| Decision timing | Single final analysis | Multiple planned interim analyses |
| Error control | Simple alpha level | Alpha spending across analyses |
| Regulatory acceptance | Straightforward | Requires pre-specification and consultation |

Implementation success depends on careful pre-specification of all sequential parameters, appropriate software validation, and enhanced data monitoring committee training. The COVID-19 pandemic demonstrated both the value and challenges of flexible designs when enrollment patterns deviate from assumptions. Future applications will likely emphasize platform trial architectures and real-world evidence integration while maintaining statistical rigor.

## Statistical Framework and Stopping Boundaries

**Group sequential testing requires careful balance between early stopping opportunities and Type I error control through mathematically rigorous boundary frameworks that extend beyond traditional efficacy-only designs.**

Alpha spending functions provide the foundation for controlling overall significance levels across multiple interim analyses. The Lan-DeMets approach allows flexible timing of analyses by specifying how the total Type I error rate (typically 0.05) is allocated across planned looks, ensuring statistical validity regardless of when stopping occurs.

Information fractions define the proportion of total planned information available at each interim analysis. These fractions drive boundary calculations and must account for missing data patterns and endpoint-specific accumulation rates.

Three primary boundary types dominate practice:

* **Pocock boundaries** maintain constant critical values across all analyses, enabling easier interpretation but requiring larger sample sizes
* **O'Brien-Fleming boundaries** use conservative early thresholds that relax toward standard levels, preserving power while allowing early efficacy detection  
* **Wang-Tsiatis boundaries** offer flexible parameterization between Pocock and O'Brien-Fleming extremes

Modern applications extend to changing primary endpoints mid-trial through flexible boundary adjustments. LinkedIn's experimentation platform demonstrates practical trade-offs: O'Brien-Fleming boundaries reduce false positives in early analyses but require stronger evidence for early termination.

## Types of Group Sequential Designs and Applications

**Group sequential designs have evolved from simple two-arm comparisons to sophisticated multi-stage frameworks that accommodate complex trial architectures while maintaining statistical rigor.**

Classical designs form the foundation with well-established boundary functions. O'Brien-Fleming boundaries provide conservative early stopping with α-spending concentrated at final analysis, making them ideal for confirmatory trials requiring high evidence standards. Pocock boundaries distribute α equally across analyses, offering simpler implementation but demanding larger sample sizes.

Adaptive designs enable protocol modifications based on accumulating data. Sample size re-estimation addresses initial planning uncertainties, while population enrichment focuses recruitment on responsive subgroups. These designs require careful pre-specification to maintain statistical validity.

Platform trials represent the most complex application, allowing multiple treatments to enter and exit dynamically. They require sophisticated multiple comparison procedures and online error rate control as demonstrated in COVID-19 vaccine trials where treatments were added sequentially.

Hybrid approaches combine fixed and adaptive elements, balancing operational flexibility with regulatory acceptance. The choice depends on study objectives, regulatory pathway, and institutional capacity for complex interim decision-making.

| Design Type | Boundary Characteristic | Optimal Application |
|-------------|------------------------|-------------------|
| O'Brien-Fleming | Conservative early, liberal late | Confirmatory trials |
| Pocock | Constant across analyses | Exploratory studies |
| Adaptive | Data-driven modifications | Uncertain populations |

## Implementation Considerations and Regulatory Guidance

**Information fraction calculations require careful definition of statistical information rather than simple sample size ratios, particularly when dealing with time-to-event endpoints or unequal allocation schemes.** This fundamental concept drives all subsequent implementation decisions.

Sample size calculations must account for information fractions and spending functions, with validation across multiple platforms. The alpha spending function approach provides flexible boundary construction, allowing investigators to specify spending rates at planned interim analyses without rigid pre-commitment to specific critical values.

Software tools like East, PASS, and R's gsDesign package handle sequential boundaries effectively. Recent simulation methods enable practical sample size calculations by incorporating realistic assumptions about enrollment patterns, dropout rates, and effect size uncertainty.

Data monitoring committees require enhanced training on information-based stopping rules and access to updated boundary calculations at each interim analysis. The COVID-19 pandemic demonstrated how enrollment disruptions can affect information accrual timing, making flexible spending functions essential for maintaining statistical validity.

Regulatory agencies expect comprehensive statistical analysis plans detailing interim timing rationale, alpha spending justification, and clear stopping criteria. FDA guidance emphasizes pre-specification of all sequential testing parameters and early consultation to ensure design acceptance before trial initiation.

## Conclusion and Key Takeaways

Group sequential testing offers powerful early stopping capabilities while maintaining statistical rigor, but implementation complexity varies significantly across design types. The choice between boundary approaches involves fundamental trade-offs between early detection sensitivity and sample size requirements.

| Design Type | Primary Advantage | Key Limitation | Best Use Case |
|-------------|------------------|----------------|---------------|
| O'Brien-Fleming | Preserves power, conservative early stopping | Requires strong early evidence | Confirmatory trials |
| Pocock | Simple interpretation, flexible stopping | Larger sample sizes needed | Exploratory studies |
| Adaptive | Protocol flexibility, uncertainty handling | Complex implementation | Unknown populations |
| Wang-Tsiatis | Customizable boundary shapes | Parameter selection complexity | Tailored applications |

Future directions emphasize platform trials and real-world evidence integration, driven by pandemic-era innovations in flexible trial design. The evolution toward information-based rather than sample-based stopping criteria represents a fundamental shift requiring enhanced statistical expertise and regulatory collaboration. Success depends on pre-specification rigor, appropriate software implementation, and data monitoring committee sophistication in managing complex interim decision frameworks.

## Sources Used

### Sources for Section: Statistical Framework and Stopping Boundaries

================================================================================
SEARCH ITERATION 1 - RAG RESULTS
================================================================================

Kohavi: LinkedIn Post (New_AB_Pattern_Reproduction_Results), Section: unknown
===
Kohavi: Controlled Experiments on the Web Survey and Practical Guide, Section: 3. Primacy and newness effects.
===
Kohavi: LinkedIn Post (Lead_with_Clear_Memorable_Leadership_Principl), Section: unknown
===
Kohavi: Why are Power Calculators Giving Different Results, Section: Untitled
===
Kohavi: Online Controlled Experiments at Large Scale, Section: 4.3 Alerts and Aborting Bad Experiments
===

================================================================================
SEARCH ITERATION 2 - ARXIV RESULTS
================================================================================

Exact sequential single-arm trial design with curtailment for binary endpoint
===
URL: http://arxiv.org/abs/2303.17091v1
===
Interim Analysis in Sequential Multiple Assignment Randomized Trials for Survival Outcomes
===
URL: http://arxiv.org/abs/2504.03143v1
===
Bayesian response adaptive randomization design with a composite endpoint of mortality and morbidity
===
URL: http://arxiv.org/abs/2208.08472v3
===

================================================================================
SEARCH ITERATION 3 - TAVILY RESULTS
================================================================================

Defining information fractions in group sequential clinical trials with ...
===
URL: https://www.sciencedirect.com/science/article/pii/S2451865418300097
===
Flexible Stopping Boundaries When Changing Primary Endpoints after ...
===
URL: https://pmc.ncbi.nlm.nih.gov/articles/PMC4024106/
===
PDF
===
URL: https://www.biostat.wisc.edu/~chappell/641/papers/paper35.pdf
===
9.6 - Alpha Spending Function approach | STAT 509 - Statistics Online
===
URL: https://online.stat.psu.edu/stat509/lesson/9/9.6
===
PDF
===
URL: https://eclass.uoa.gr/modules/document/file.php/MATH301/PracticalSession3/LanDeMets.pdf
===
Full source content limited to 1000 tokens

### Sources for Section: Types of Group Sequential Designs and Applications

================================================================================
SEARCH ITERATION 1 - RAG RESULTS
================================================================================

Kohavi: Online Controlled Experiments and AB Tests, Section: Increasing Experiment Sensitivity
===
Kohavi: Online Controlled Experiments and AB Tests, Section: Sample size.
===
Kohavi: Controlled Experiments on the Web Survey and Practical Guide, Section: Abstract
===
Kohavi: Online Controlled Experiments at Large Scale, Section: 3.1 Why Controlled Experiments?
===
Kohavi: Controlled Experiments on the Web Survey and Practical Guide, Section: 3.6 Limitations
===

================================================================================
SEARCH ITERATION 2 - ARXIV RESULTS
================================================================================

Interim Monitoring of Sequential Multiple Assignment Randomized Trials Using Partial Information
===
URL: http://arxiv.org/abs/2209.06306v2
===
Interim Analysis in Sequential Multiple Assignment Randomized Trials for Survival Outcomes
===
URL: http://arxiv.org/abs/2504.03143v1
===
Online control of the False Discovery Rate in group-sequential platform trials
===
URL: http://arxiv.org/abs/2112.10619v1
===

================================================================================
SEARCH ITERATION 3 - TAVILY RESULTS
================================================================================

PDF
===
URL: https://mwsug.org/proceedings/2016/PH/MWSUG-2016-PH06.pdf
===
Full source content limited to 1000 tokens 

6 Deriving group sequential designs - gsDesign Technical Manual
===
URL: https://keaven.github.io/gsd-tech-manual/gsdesign.html
===
Full source content limited to 1000 tokens 

Guidance on interim analysis methods in clinical trials - PMC
===
URL: https://pmc.ncbi.nlm.nih.gov/articles/PMC10260346/
===
9.5 - Frequentist Methods: O'Brien-Fleming, Pocock, Haybittle-Peto
===
URL: https://online.stat.psu.edu/stat509/lesson/9/9.5
===
Full source content limited to 1000 tokens 

Interim Analyses During Group Sequential Clinical Trials
===
URL: https://jamanetwork.com/journals/jama/fullarticle/2784821
===

### Sources for Section: Implementation Considerations and Regulatory Guidance

================================================================================
SEARCH ITERATION 1 - RAG RESULTS
================================================================================

Kohavi: Controlled Experiments on the Web Survey and Practical Guide, Section: 6.2.3 Determine the minimum sample size
===
Kohavi: Online Controlled Experiments at Large Scale, Section: 5.1 False Positives
===
Kohavi: The Surprising Power of Online Experiments, Section: Center-of-excellence model
===
Kohavi: Controlled Experiments on the Web Survey and Practical Guide, Section: 5.2 Assignment method
===
Kohavi: Why are Power Calculators Giving Different Results, Section: Explaining the differences:
===

================================================================================
SEARCH ITERATION 2 - ARXIV RESULTS
================================================================================

Clinical trials impacted by the COVID-19 pandemic: Adaptive designs to the rescue?
===
URL: http://arxiv.org/abs/2005.13979v1
===
Confidence intervals for adaptive trial designs II: Case study and practical guidance
===
URL: http://arxiv.org/abs/2411.08771v1
===
Confidence intervals for adaptive trial designs I: A methodological review
===
URL: http://arxiv.org/abs/2411.08495v1
===

================================================================================
SEARCH ITERATION 3 - TAVILY RESULTS
================================================================================

Defining information fractions in group sequential clinical trials with ...
===
URL: https://www.sciencedirect.com/science/article/pii/S2451865418300097
===
A Practical Simulation Method to Calculate Sample Size of Group ...
===
URL: https://pmc.ncbi.nlm.nih.gov/articles/PMC3434206/
===
8 Spending functions - Group Sequential Designs Made Easy
===
URL: https://keaven.github.io/gsd-shiny/spending.html
===
Full source content limited to 1000 tokens 

Interim Analyses During Group Sequential Clinical Trials
===
URL: https://jamanetwork.com/journals/jama/fullarticle/2784821
===
9.6 - Alpha Spending Function approach | STAT 509 - Statistics Online
===
URL: https://online.stat.psu.edu/stat509/lesson/9/9.6
===

