# Multiple Schemas - Practice Exercises

## Overview
This notebook provides hands-on exercises to practice working with multiple schemas in LangGraph. You'll learn about private state, input/output schema filtering, and complex multi-schema architectures.

## Learning Objectives
By the end of these exercises, you will:
- Understand how to use private state between nodes
- Know how to define separate input and output schemas for graphs
- Practice filtering graph inputs and outputs
- Build complex systems with multiple schema types
- Design clean APIs with controlled data exposure

## Prerequisites
- Completed the multiple-schemas.ipynb tutorial
- Understanding of state schema and reducer concepts

In [None]:
%%capture --no-stderr
%pip install --quiet -U langgraph

## Exercise 1: Private State for Internal Processing

### Task
Create a user authentication system where sensitive information (like password hashes and session tokens) is kept private between nodes but not exposed in the final output.

### TODO: Define public and private schemas

In [None]:
from typing_extensions import TypedDict
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END

# TODO: Define the public state (what external users see)
class PublicAuthState(TypedDict):
    # TODO: Add public fields: username, is_authenticated, user_role
    pass

# TODO: Define the private state (internal processing data)
class PrivateAuthState(TypedDict):
    # TODO: Add private fields: password_hash, session_token, failed_attempts
    pass

In [None]:
# TODO: Implement authentication nodes
def validate_credentials(state: PublicAuthState) -> PrivateAuthState:
    print(f"Validating credentials for user: {state['username']}")
    # TODO: Simulate password validation and return private state
    # Return password_hash, session_token, failed_attempts
    pass

def check_security_policy(state: PrivateAuthState) -> PrivateAuthState:
    print(f"Checking security policy (failed attempts: {state['failed_attempts']})")
    # TODO: Update failed_attempts or other security metrics
    pass

def generate_session(state: PrivateAuthState) -> PublicAuthState:
    print(f"Generating session with token: {state['session_token'][:8]}...")
    # TODO: Return public authentication result
    # Set is_authenticated=True and appropriate user_role
    pass

In [None]:
# TODO: Build authentication graph
builder_auth = StateGraph(PublicAuthState)
# TODO: Add nodes with proper schema types
# TODO: Create flow: validate_credentials -> check_security_policy -> generate_session

graph_auth = builder_auth.compile()
display(Image(graph_auth.get_graph().draw_mermaid_png()))

In [None]:
# TODO: Test authentication with private state
auth_input = {
    "username": "alice@example.com",
    "is_authenticated": False,
    "user_role": ""
}

result_auth = graph_auth.invoke(auth_input)
print("Authentication result:", result_auth)
# Notice: private fields like password_hash and session_token should not appear in output

## Exercise 2: Input/Output Schema Filtering

### Task
Create a data processing pipeline with controlled input/output schemas. The internal processing uses more fields than what's exposed externally.

### TODO: Define input, output, and internal schemas

In [None]:
from typing import List, Dict, Any

# TODO: Define input schema (what users provide)
class DataInput(TypedDict):
    # TODO: Add raw_data (List[Dict[str, Any]]) and processing_type (str)
    pass

# TODO: Define output schema (what users receive)
class DataOutput(TypedDict):
    # TODO: Add processed_data (List[Dict[str, Any]]) and summary (Dict[str, Any])
    pass

# TODO: Define internal schema (full processing state)
class DataProcessingState(TypedDict):
    # TODO: Include all fields from input and output, plus internal fields:
    # raw_data, processing_type, processed_data, summary
    # Plus internal: intermediate_results, error_log, processing_time
    pass

In [None]:
# TODO: Implement processing nodes
def validate_input(state: DataInput):
    print(f"Validating input data ({len(state['raw_data'])} records)")
    # TODO: Initialize internal processing state
    # Return intermediate_results=[], error_log=[], processing_time=0.0
    pass

def process_data(state: DataProcessingState):
    print(f"Processing data with type: {state['processing_type']}")
    # TODO: Process raw_data based on processing_type
    # Update processed_data, intermediate_results, processing_time
    pass

def generate_summary(state: DataProcessingState) -> DataOutput:
    print(f"Generating summary (processing time: {state['processing_time']}s)")
    # TODO: Create summary from processed data
    # Return only DataOutput fields
    pass

In [None]:
# TODO: Build graph with input/output schema filtering
builder_data = StateGraph(
    DataProcessingState,  # Internal state
    input_schema=DataInput,   # TODO: Set input schema
    output_schema=DataOutput  # TODO: Set output schema
)

# TODO: Add nodes and edges
# validate_input -> process_data -> generate_summary

graph_data = builder_data.compile()
display(Image(graph_data.get_graph().draw_mermaid_png()))

In [None]:
# TODO: Test with input/output filtering
data_input = {
    "raw_data": [
        {"name": "Alice", "age": 30, "score": 85},
        {"name": "Bob", "age": 25, "score": 92},
        {"name": "Charlie", "age": 35, "score": 78}
    ],
    "processing_type": "statistical_analysis"
}

result_data = graph_data.invoke(data_input)
print("Data processing result:", result_data)
# Notice: internal fields like intermediate_results and processing_time should be filtered out

## Exercise 3: Multi-Stage Pipeline with Different Schemas

### Task
Create a complex document processing pipeline where different stages use different schemas, simulating a real-world document analysis system.

### TODO: Define schemas for different processing stages

In [None]:
# TODO: Define document input schema
class DocumentInput(TypedDict):
    # TODO: Add document_text and document_type
    pass

# TODO: Define text analysis schema
class TextAnalysisState(TypedDict):
    # TODO: Add word_count, sentence_count, language, sentiment_score
    pass

# TODO: Define content extraction schema
class ContentExtractionState(TypedDict):
    # TODO: Add entities, keywords, topics, summary
    pass

# TODO: Define final output schema
class DocumentAnalysisOutput(TypedDict):
    # TODO: Add analysis_report (combining key insights) and confidence_score
    pass

# TODO: Define comprehensive internal schema
class DocumentProcessingState(TypedDict):
    # TODO: Include all fields from above schemas plus internal fields:
    # document_text, document_type, word_count, sentence_count, language, 
    # sentiment_score, entities, keywords, topics, summary, analysis_report, 
    # confidence_score, processing_stage, error_messages
    pass

In [None]:
# TODO: Implement multi-stage processing nodes
def analyze_text(state: DocumentInput) -> TextAnalysisState:
    print(f"Analyzing text for document type: {state['document_type']}")
    text = state['document_text']
    # TODO: Perform basic text analysis
    # Calculate word_count, sentence_count, detect language, sentiment_score
    # Also set processing_stage="text_analysis"
    pass

def extract_content(state: TextAnalysisState) -> ContentExtractionState:
    print(f"Extracting content (language: {state['language']})")
    # TODO: Extract entities, keywords, topics, and create summary
    # Also set processing_stage="content_extraction"
    pass

def generate_report(state: ContentExtractionState) -> DocumentAnalysisOutput:
    print(f"Generating analysis report")
    # TODO: Create comprehensive analysis_report and calculate confidence_score
    # Also set processing_stage="report_generation"
    pass

In [None]:
# TODO: Build multi-stage document processing graph
builder_doc = StateGraph(
    DocumentProcessingState,
    input_schema=DocumentInput,
    output_schema=DocumentAnalysisOutput
)

# TODO: Add nodes and connect them in sequence
# analyze_text -> extract_content -> generate_report

graph_doc = builder_doc.compile()
display(Image(graph_doc.get_graph().draw_mermaid_png()))

In [None]:
# TODO: Test document processing pipeline
sample_document = {
    "document_text": """
    LangGraph is a powerful framework for building stateful, multi-actor applications with LLMs. 
    It enables developers to create complex workflows that can maintain context and state across 
    multiple interactions. The framework provides excellent support for human-in-the-loop patterns 
    and sophisticated error handling. Overall, it represents a significant advancement in the field 
    of language model applications.
    """,
    "document_type": "technical_article"
}

result_doc_analysis = graph_doc.invoke(sample_document)
print("Document analysis result:", result_doc_analysis)

## Exercise 4: API Gateway Pattern with Schema Transformation

### Task
Create an API gateway that accepts requests in one format, processes them through multiple internal services with different schemas, and returns a unified response format.

### TODO: Define API gateway schemas

In [None]:
from typing import Optional

# TODO: Define external API request schema
class APIRequest(TypedDict):
    # TODO: Add user_id, action, parameters
    pass

# TODO: Define external API response schema
class APIResponse(TypedDict):
    # TODO: Add success, data, message, request_id
    pass

# TODO: Define user service schema
class UserServiceState(TypedDict):
    # TODO: Add user_profile, permissions, last_login
    pass

# TODO: Define business logic schema
class BusinessLogicState(TypedDict):
    # TODO: Add business_rules, calculations, validations
    pass

# TODO: Define comprehensive internal schema
class GatewayInternalState(TypedDict):
    # TODO: Include all fields from above schemas plus:
    # request_id, processing_start_time, service_calls, errors
    pass

In [None]:
import uuid
import time

# TODO: Implement API gateway nodes
def process_request(state: APIRequest):
    print(f"Processing API request for user: {state['user_id']}")
    # TODO: Initialize internal processing state
    # Generate request_id, set processing_start_time, initialize service_calls and errors
    pass

def call_user_service(state: GatewayInternalState) -> UserServiceState:
    print(f"Calling user service for request: {state['request_id']}")
    # TODO: Simulate user service call
    # Return user_profile, permissions, last_login
    # Update service_calls list
    pass

def execute_business_logic(state: UserServiceState) -> BusinessLogicState:
    print(f"Executing business logic")
    # TODO: Process based on action and user permissions
    # Return business_rules, calculations, validations
    pass

def format_response(state: BusinessLogicState) -> APIResponse:
    print(f"Formatting API response")
    # TODO: Create unified response format
    # Set success, data, message, request_id
    pass

In [None]:
# TODO: Build API gateway graph
builder_api = StateGraph(
    GatewayInternalState,
    input_schema=APIRequest,
    output_schema=APIResponse
)

# TODO: Add nodes and edges
# process_request -> call_user_service -> execute_business_logic -> format_response

graph_api = builder_api.compile()
display(Image(graph_api.get_graph().draw_mermaid_png()))

In [None]:
# TODO: Test API gateway
api_request = {
    "user_id": "user_12345",
    "action": "get_user_stats",
    "parameters": {"include_history": True, "date_range": "30d"}
}

api_response = graph_api.invoke(api_request)
print("API Gateway Response:", api_response)
# Should only contain APIResponse fields, internal processing details filtered out

## Exercise 5: Conditional Schema Routing

### Task
Create a system where different paths through the graph use different schemas based on the input type or processing requirements.

### TODO: Define conditional schema system

In [None]:
from typing import Literal

# TODO: Define base input schema
class ProcessingRequest(TypedDict):
    # TODO: Add content, processing_type, priority
    pass

# TODO: Define text processing schema
class TextProcessingState(TypedDict):
    # TODO: Add text_content, language, nlp_results
    pass

# TODO: Define image processing schema
class ImageProcessingState(TypedDict):
    # TODO: Add image_metadata, vision_results, processed_image_path
    pass

# TODO: Define audio processing schema
class AudioProcessingState(TypedDict):
    # TODO: Add audio_metadata, transcription, audio_features
    pass

# TODO: Define unified output schema
class ProcessingResult(TypedDict):
    # TODO: Add processing_type, results, confidence, processing_time
    pass

In [None]:
# TODO: Implement routing logic
def route_by_type(state: ProcessingRequest) -> Literal["process_text", "process_image", "process_audio"]:
    # TODO: Route based on processing_type
    processing_type = state['processing_type']
    # Return appropriate node name based on type
    pass

# TODO: Implement type-specific processing nodes
def process_text(state: ProcessingRequest) -> TextProcessingState:
    print("Processing text content")
    # TODO: Extract text from content and process it
    pass

def process_image(state: ProcessingRequest) -> ImageProcessingState:
    print("Processing image content")
    # TODO: Extract image metadata and process it
    pass

def process_audio(state: ProcessingRequest) -> AudioProcessingState:
    print("Processing audio content")
    # TODO: Extract audio features and process it
    pass

# TODO: Implement unified result formatter
def format_results(state) -> ProcessingResult:
    print("Formatting processing results")
    # TODO: Create unified result format regardless of processing type
    # This node needs to handle different input schema types
    pass

In [None]:
# TODO: Build conditional routing graph
# Note: This is a more complex setup with conditional schemas
builder_conditional = StateGraph(ProcessingRequest)  # Base state for routing

# TODO: Add all processing nodes
# TODO: Add conditional edges from START based on processing type
# TODO: Connect all processing nodes to format_results

graph_conditional = builder_conditional.compile()
display(Image(graph_conditional.get_graph().draw_mermaid_png()))

In [None]:
# TODO: Test conditional routing with different content types
test_requests = [
    {
        "content": "This is a sample text for natural language processing.",
        "processing_type": "text",
        "priority": "high"
    },
    {
        "content": "path/to/image.jpg",
        "processing_type": "image", 
        "priority": "medium"
    },
    {
        "content": "path/to/audio.wav",
        "processing_type": "audio",
        "priority": "low"
    }
]

for i, request in enumerate(test_requests):
    print(f"\n--- Test {i+1}: {request['processing_type']} processing ---")
    result = graph_conditional.invoke(request)
    print(f"Result: {result}")

## Challenge Exercise: Schema Migration System

### Task
Create a system that can handle different versions of schemas for backward compatibility, demonstrating how to manage schema evolution in production systems.

### TODO: Implement schema versioning system

In [None]:
# TODO: Define different schema versions
class UserDataV1(TypedDict):
    # TODO: Add basic fields: name, email
    pass

class UserDataV2(TypedDict):
    # TODO: Add extended fields: name, email, phone, address
    pass

class UserDataV3(TypedDict):
    # TODO: Add latest fields: name, email, phone, address, preferences, metadata
    pass

# TODO: Implement schema migration functions
def migrate_v1_to_v2(v1_data: UserDataV1) -> UserDataV2:
    # TODO: Migrate V1 to V2 by adding default values
    pass

def migrate_v2_to_v3(v2_data: UserDataV2) -> UserDataV3:
    # TODO: Migrate V2 to V3 by adding default values
    pass

def detect_schema_version(data: dict) -> str:
    # TODO: Detect schema version based on available fields
    pass

print("Schema migration system defined - implement the migration logic!")

## Summary

In these exercises, you've practiced:
- Using private state for internal processing that's not exposed in outputs
- Defining separate input/output schemas to control API boundaries
- Building multi-stage pipelines with different schemas at each stage
- Creating API gateway patterns with schema transformation
- Implementing conditional routing based on schema types
- Managing schema versioning and migration

Key takeaways:
- **Private State**: Enables secure internal processing without exposing sensitive data
- **Input/Output Schemas**: Provide clean API boundaries and control data exposure
- **Multi-Schema Systems**: Allow complex workflows with different data requirements
- **Schema Transformation**: Essential for building robust, maintainable systems
- **Conditional Schemas**: Enable flexible processing based on input characteristics
- **Version Management**: Critical for production systems that need to evolve

These patterns are essential for building production-ready LangGraph applications that need to handle complex data flows while maintaining clean interfaces and security boundaries.

Next, continue with the trim-filter-messages exercises to learn about message management in conversational systems!