# Topic 6: Subgraphs

Learn how to build modular, reusable graph components using subgraphs. Create complex applications by composing smaller graphs together.

## Learning Objectives

- Understand subgraphs and modularity
- Create reusable graph components
- Compose subgraphs into larger workflows
- Manage state between parent and child graphs
- Build maintainable, scalable applications

In [None]:
# Setup
import os
import getpass
from typing import TypedDict, Literal, Annotated
from langgraph.graph import StateGraph, START, END
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage
import operator

if "ANTHROPIC_API_KEY" not in os.environ:
    os.environ["ANTHROPIC_API_KEY"] = getpass.getpass("Enter your Anthropic API key: ")

model = ChatAnthropic(model="claude-3-5-sonnet-20241022")
print("‚úì Setup complete!")

## What are Subgraphs?

Subgraphs allow you to:
- **Encapsulate logic**: Package related functionality into reusable components
- **Improve maintainability**: Separate concerns and organize complex workflows
- **Enable reuse**: Use the same subgraph in multiple places
- **Simplify testing**: Test subgraphs independently

Think of subgraphs as functions that are themselves complete graphs. You can call them from parent graphs just like nodes!

## Example 1: Reusable Validation Subgraph

Let's create a validation subgraph that can be reused across different workflows.

In [None]:
# Define state for the validation subgraph
class ValidationState(TypedDict):
    data: str
    validation_result: str
    is_valid: bool
    error_messages: list[str]

print("‚úì ValidationState defined")

In [None]:
# Create validation nodes
def check_length(state: ValidationState) -> ValidationState:
    """Check if data meets length requirements."""
    print("   üìè Checking length...")
    
    errors = state.get("error_messages", [])
    data = state["data"]
    
    if len(data) < 10:
        errors.append("Data too short (minimum 10 characters)")
    elif len(data) > 500:
        errors.append("Data too long (maximum 500 characters)")
    
    return {"error_messages": errors}

def check_format(state: ValidationState) -> ValidationState:
    """Check if data has valid format."""
    print("   üìã Checking format...")
    
    errors = state.get("error_messages", [])
    data = state["data"]
    
    # Use LLM to validate format and content quality
    prompt = f"""Analyze this text and identify any formatting or content quality issues:

{data}

If there are issues, list them briefly (one per line).
If the text is good quality and well-formatted, respond with: "No issues found"""
    
    response = model.invoke([HumanMessage(content=prompt)])
    
    if "no issues" not in response.content.lower():
        errors.append(f"Format issues: {response.content[:100]}")
    
    return {"error_messages": errors}

def finalize_validation(state: ValidationState) -> ValidationState:
    """Finalize validation results."""
    print("   ‚úÖ Finalizing validation...")
    
    errors = state.get("error_messages", [])
    is_valid = len(errors) == 0
    
    if is_valid:
        result = "‚úì Validation passed"
    else:
        result = f"‚úó Validation failed: {len(errors)} issue(s) found"
    
    return {
        "is_valid": is_valid,
        "validation_result": result
    }

print("‚úì Validation nodes created")

In [None]:
# Build the validation subgraph
validation_builder = StateGraph(ValidationState)

# Add nodes
validation_builder.add_node("check_length", check_length)
validation_builder.add_node("check_format", check_format)
validation_builder.add_node("finalize", finalize_validation)

# Add edges
validation_builder.add_edge(START, "check_length")
validation_builder.add_edge("check_length", "check_format")
validation_builder.add_edge("check_format", "finalize")
validation_builder.add_edge("finalize", END)

# Compile the subgraph
validation_subgraph = validation_builder.compile()

print("‚úì Validation subgraph compiled!")

## Test the Validation Subgraph

Let's test our reusable validation component:

In [None]:
# Test with valid data
print("\nTest 1: Valid data")
print("="*60)

result = validation_subgraph.invoke({
    "data": "This is a well-formatted piece of text that meets all requirements and contains meaningful content.",
    "validation_result": "",
    "is_valid": False,
    "error_messages": []
})

print(f"\nResult: {result['validation_result']}")
print(f"Valid: {result['is_valid']}")
if result['error_messages']:
    print(f"Errors: {result['error_messages']}")

In [None]:
# Test with invalid data
print("\nTest 2: Invalid data (too short)")
print("="*60)

result = validation_subgraph.invoke({
    "data": "Short",
    "validation_result": "",
    "is_valid": False,
    "error_messages": []
})

print(f"\nResult: {result['validation_result']}")
print(f"Valid: {result['is_valid']}")
if result['error_messages']:
    print(f"Errors:")
    for error in result['error_messages']:
        print(f"  - {error}")

## Example 2: Using Subgraphs in a Parent Workflow

Now let's use our validation subgraph as part of a larger content processing pipeline.

In [None]:
# Define state for the parent workflow
class ContentProcessingState(TypedDict):
    raw_content: str
    data: str  # Used by validation subgraph
    validation_result: str  # Used by validation subgraph
    is_valid: bool  # Used by validation subgraph
    error_messages: list[str]  # Used by validation subgraph
    processed_content: str
    final_output: str
    processing_status: str

print("‚úì ContentProcessingState defined")

In [None]:
# Create parent workflow nodes
def prepare_content(state: ContentProcessingState) -> ContentProcessingState:
    """Prepare raw content for validation."""
    print("\nüì• Preparing content...")
    
    # Copy raw content to the field expected by validation subgraph
    return {
        "data": state["raw_content"],
        "error_messages": []
    }

def process_valid_content(state: ContentProcessingState) -> ContentProcessingState:
    """Process content that passed validation."""
    print("\n‚öôÔ∏è  Processing valid content...")
    
    prompt = f"""Enhance and improve this content while maintaining its core message:

{state['data']}

Make it more engaging and well-structured."""
    
    response = model.invoke([HumanMessage(content=prompt)])
    
    return {
        "processed_content": response.content,
        "final_output": response.content,
        "processing_status": "success"
    }

def handle_invalid_content(state: ContentProcessingState) -> ContentProcessingState:
    """Handle content that failed validation."""
    print("\n‚ùå Handling invalid content...")
    
    errors = "\n".join([f"  - {err}" for err in state.get("error_messages", [])])
    
    return {
        "final_output": f"Content validation failed. Issues found:\n{errors}",
        "processing_status": "validation_failed"
    }

def route_after_validation(state: ContentProcessingState) -> Literal["process", "reject"]:
    """Route based on validation results."""
    if state.get("is_valid", False):
        print("‚úì Routing to processing...")
        return "process"
    else:
        print("‚úó Routing to rejection...")
        return "reject"

print("‚úì Parent workflow nodes created")

In [None]:
# Build the parent workflow that uses the validation subgraph
workflow_builder = StateGraph(ContentProcessingState)

# Add nodes - including the validation subgraph as a node!
workflow_builder.add_node("prepare", prepare_content)
workflow_builder.add_node("validate", validation_subgraph)  # Subgraph as a node!
workflow_builder.add_node("process", process_valid_content)
workflow_builder.add_node("reject", handle_invalid_content)

# Add edges
workflow_builder.add_edge(START, "prepare")
workflow_builder.add_edge("prepare", "validate")

# Conditional routing after validation
workflow_builder.add_conditional_edges(
    "validate",
    route_after_validation,
    {
        "process": "process",
        "reject": "reject"
    }
)

workflow_builder.add_edge("process", END)
workflow_builder.add_edge("reject", END)

# Compile the parent workflow
content_workflow = workflow_builder.compile()

print("‚úì Content processing workflow compiled!")

## Visualize the Complete Workflow

Notice how the validation subgraph appears as a single node in the parent graph:

In [None]:
from IPython.display import Image, display

try:
    display(Image(content_workflow.get_graph().draw_mermaid_png()))
except Exception:
    print("Graph structure:")
    print("START -> prepare -> validate (subgraph) -> [process|reject] -> END")

## Test the Complete Workflow

In [None]:
# Test with valid content
print("\n" + "="*60)
print("Test 1: Valid Content")
print("="*60)

result = content_workflow.invoke({
    "raw_content": "LangGraph is a powerful framework for building stateful LLM applications with complex workflows.",
    "data": "",
    "validation_result": "",
    "is_valid": False,
    "error_messages": [],
    "processed_content": "",
    "final_output": "",
    "processing_status": ""
})

print(f"\nüìä Status: {result['processing_status']}")
print(f"\nüìÑ Final Output:\n{result['final_output']}")

In [None]:
# Test with invalid content
print("\n" + "="*60)
print("Test 2: Invalid Content")
print("="*60)

result = content_workflow.invoke({
    "raw_content": "Short",
    "data": "",
    "validation_result": "",
    "is_valid": False,
    "error_messages": [],
    "processed_content": "",
    "final_output": "",
    "processing_status": ""
})

print(f"\nüìä Status: {result['processing_status']}")
print(f"\nüìÑ Final Output:\n{result['final_output']}")

## Example 3: Data Processing Pipeline with Multiple Subgraphs

Let's build a more complex example with multiple reusable subgraphs for data processing.

In [None]:
# Subgraph 1: Data Extraction
class ExtractionState(TypedDict):
    raw_data: str
    extracted_entities: list[str]
    extracted_keywords: list[str]

def extract_entities(state: ExtractionState) -> ExtractionState:
    """Extract named entities from text."""
    print("   üîç Extracting entities...")
    
    prompt = f"""Extract all named entities (people, organizations, locations) from this text:

{state['raw_data']}

List them separated by commas."""
    
    response = model.invoke([HumanMessage(content=prompt)])
    entities = [e.strip() for e in response.content.split(',')]
    
    return {"extracted_entities": entities}

def extract_keywords(state: ExtractionState) -> ExtractionState:
    """Extract key themes and topics."""
    print("   üè∑Ô∏è  Extracting keywords...")
    
    prompt = f"""Extract 5 key themes or topics from this text:

{state['raw_data']}

List them separated by commas."""
    
    response = model.invoke([HumanMessage(content=prompt)])
    keywords = [k.strip() for k in response.content.split(',')[:5]]
    
    return {"extracted_keywords": keywords}

# Build extraction subgraph
extraction_builder = StateGraph(ExtractionState)
extraction_builder.add_node("extract_entities", extract_entities)
extraction_builder.add_node("extract_keywords", extract_keywords)
extraction_builder.add_edge(START, "extract_entities")
extraction_builder.add_edge("extract_entities", "extract_keywords")
extraction_builder.add_edge("extract_keywords", END)
extraction_subgraph = extraction_builder.compile()

print("‚úì Extraction subgraph created")

In [None]:
# Subgraph 2: Data Enrichment
class EnrichmentState(TypedDict):
    extracted_entities: list[str]
    extracted_keywords: list[str]
    enriched_data: str
    summary: str

def enrich_data(state: EnrichmentState) -> EnrichmentState:
    """Enrich extracted data with additional context."""
    print("   üíé Enriching data...")
    
    entities_str = ", ".join(state.get("extracted_entities", []))
    keywords_str = ", ".join(state.get("extracted_keywords", []))
    
    prompt = f"""Provide brief context about these entities and topics:

Entities: {entities_str}
Topics: {keywords_str}

Be concise and informative."""
    
    response = model.invoke([HumanMessage(content=prompt)])
    
    return {"enriched_data": response.content}

def create_summary(state: EnrichmentState) -> EnrichmentState:
    """Create a summary of the enriched data."""
    print("   üìù Creating summary...")
    
    summary = f"""Data Analysis Summary:
- Entities found: {len(state.get('extracted_entities', []))}
- Key topics: {len(state.get('extracted_keywords', []))}

Enrichment:
{state.get('enriched_data', '')}"""
    
    return {"summary": summary}

# Build enrichment subgraph
enrichment_builder = StateGraph(EnrichmentState)
enrichment_builder.add_node("enrich", enrich_data)
enrichment_builder.add_node("summarize", create_summary)
enrichment_builder.add_edge(START, "enrich")
enrichment_builder.add_edge("enrich", "summarize")
enrichment_builder.add_edge("summarize", END)
enrichment_subgraph = enrichment_builder.compile()

print("‚úì Enrichment subgraph created")

In [None]:
# Create parent pipeline that uses both subgraphs
class PipelineState(TypedDict):
    raw_data: str
    extracted_entities: list[str]
    extracted_keywords: list[str]
    enriched_data: str
    summary: str
    pipeline_result: str

def prepare_pipeline(state: PipelineState) -> PipelineState:
    """Initialize the pipeline."""
    print("\nüöÄ Starting data processing pipeline...\n")
    return {}

def finalize_pipeline(state: PipelineState) -> PipelineState:
    """Finalize pipeline results."""
    print("\n‚úÖ Pipeline complete!\n")
    
    result = f"""Pipeline Processing Complete
{'='*50}

Extracted Entities:
{', '.join(state.get('extracted_entities', []))}

Key Topics:
{', '.join(state.get('extracted_keywords', []))}

{state.get('summary', '')}
"""
    
    return {"pipeline_result": result}

# Build the pipeline
pipeline_builder = StateGraph(PipelineState)

# Add nodes including both subgraphs
pipeline_builder.add_node("prepare", prepare_pipeline)
pipeline_builder.add_node("extract", extraction_subgraph)  # First subgraph
pipeline_builder.add_node("enrich", enrichment_subgraph)   # Second subgraph
pipeline_builder.add_node("finalize", finalize_pipeline)

# Add edges
pipeline_builder.add_edge(START, "prepare")
pipeline_builder.add_edge("prepare", "extract")
pipeline_builder.add_edge("extract", "enrich")
pipeline_builder.add_edge("enrich", "finalize")
pipeline_builder.add_edge("finalize", END)

# Compile
data_pipeline = pipeline_builder.compile()

print("‚úì Complete data processing pipeline compiled!")

## Test the Multi-Subgraph Pipeline

In [None]:
# Test the complete pipeline
sample_data = """Apple Inc. announced a new partnership with Microsoft to develop 
AI-powered productivity tools. The collaboration will focus on machine learning, 
natural language processing, and cloud computing technologies. CEO Tim Cook stated 
that this partnership represents a significant milestone in enterprise software innovation."""

result = data_pipeline.invoke({
    "raw_data": sample_data,
    "extracted_entities": [],
    "extracted_keywords": [],
    "enriched_data": "",
    "summary": "",
    "pipeline_result": ""
})

print(result["pipeline_result"])

## Key Benefits of Subgraphs

### 1. Modularity
- Each subgraph is self-contained
- Easy to understand and maintain
- Can be developed and tested independently

### 2. Reusability
- Use the same subgraph in multiple workflows
- Reduce code duplication
- Build a library of reusable components

### 3. Composability
- Combine simple subgraphs into complex workflows
- Mix and match components as needed
- Scale your applications effectively

### 4. Maintainability
- Update a subgraph once, affect all uses
- Easier debugging and testing
- Clear separation of concerns

## State Management Between Parent and Subgraphs

Important principles:

1. **Shared State Keys**: The parent state must include all keys that the subgraph needs
2. **State Updates**: Subgraphs can update any state keys they return
3. **State Isolation**: Each subgraph only sees the state keys it defines
4. **Type Safety**: Use TypedDict to ensure state compatibility

## Exercise 1: Create a Translation Subgraph

Build a reusable translation subgraph that:
1. Detects the source language
2. Translates to a target language
3. Validates the translation quality

Then use it in a parent workflow that processes multiple documents.

In [None]:
# Your code here!

class TranslationState(TypedDict):
    text: str
    source_language: str
    target_language: str
    translated_text: str
    quality_score: int

# TODO: Create nodes for language detection, translation, and validation
# TODO: Build the translation subgraph
# TODO: Create a parent workflow that uses the translation subgraph
# TODO: Test with sample text

## Exercise 2: Build a Document Processing System

Create a document processing system with three subgraphs:
1. **Parser Subgraph**: Extract structure and metadata
2. **Analyzer Subgraph**: Analyze content and sentiment
3. **Reporter Subgraph**: Generate reports and summaries

Compose them into a complete document processing pipeline.

In [None]:
# Your code here!

# TODO: Define states for each subgraph
# TODO: Create the parser subgraph
# TODO: Create the analyzer subgraph
# TODO: Create the reporter subgraph
# TODO: Compose them into a parent workflow
# TODO: Test with a sample document

## Best Practices for Subgraphs

1. **Single Responsibility**: Each subgraph should have one clear purpose
2. **Clear Interfaces**: Define state carefully with TypedDict
3. **Independent Testing**: Test subgraphs separately before composition
4. **Documentation**: Document what each subgraph does and what state it requires
5. **Error Handling**: Include error handling within subgraphs
6. **State Minimization**: Only pass the state fields that are truly needed

## Key Takeaways

In this notebook, you learned:

1. ‚úÖ What subgraphs are and why they're useful
2. ‚úÖ How to create reusable graph components
3. ‚úÖ How to compose subgraphs into larger workflows
4. ‚úÖ State management between parent and child graphs
5. ‚úÖ Best practices for building modular applications
6. ‚úÖ Creating complex pipelines by combining simple subgraphs

## Next Steps

Continue to **Topic 7: Persistence** to learn how to save and restore graph state across sessions!