# 104 LangGraph: State Management - Complex Data

**Workshop**: LangGraph 101 - Express Format  
**Duration**: ~25 minutes  
**Difficulty**: Beginner

üìå **Demo Focus**: Instructor demonstrates complex state patterns. ~30% content reduction for workshop.

## Learning Objectives

By completing this notebook, you will:
- Work with multi-field state schemas
- Handle complex data types (lists, dicts, optional fields)
- Understand state accumulation patterns
- Build production-ready SCM workflows

## ‚úèÔ∏è Homework

After workshop, review full version for:
- All field type examples (boolean, dict, list operations)
- Exercise solutions (address objects, NAT policies)
- Extended error handling patterns
- Complete production workflow examples

## Prerequisites

Before starting this notebook, you should have:

- Completed **Notebook 103: State Schemas with TypedDict**
- Understanding of Python type hints and TypedDict
- Basic familiarity with LangGraph state management
- Python 3.11+ environment with required packages installed


## Table of Contents

1. [What You Learned in Notebook 103](#what-you-learned)
2. [Handling Multiple Inputs and Data Types](#multiple-inputs)
3. [Working with Lists and Complex Types](#complex-types)
4. [Understanding State Initialization](#state-initialization)
5. [Exercise: SCM Address Object Type Operations](#exercise)
6. [Real-World Example: Complete SCM Configuration Workflow](#real-world-example)
7. [What's Next](#whats-next)


## Environment Setup

Let's start by importing all the necessary libraries and setting up our environment.


In [None]:
# Core LangGraph imports
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, List, Optional
from IPython.display import Image, display

print("Environment setup complete!")


In [None]:
# Import List for defining list types
from typing import List

print("‚úÖ List type imported!")
print("\nüí° We can now define fields like: interface_metrics: List[int]")

### 7.2 Define Enhanced State

Now let's create a more complex state that handles multiple data types.

**Important naming note**: You can name your state class anything you want! It could be `BottleState`, `MyWorkflowState`, or anything else. We use `FirewallMetricsState` because it clearly describes what the state represents.

Our new state will track:
- `hostname` (str): The firewall device name
- `interface_metrics` (List[int]): List of interface utilization percentages
- `result` (str): The computed summary report

In [None]:
class SecurityRuleState(TypedDict):
    """State for SCM security rule - matches docs/examples/security_policy.py structure.
    
    References:
    - docs/examples/security_policy.py lines 9-20 (basic rule)
    - docs/examples/security_policy.py lines 26-39 (rule with profiles)
    """
    # Basic identification
    name: str                       # Security rule name
    folder: str                     # SCM folder (e.g., 'Texas')
    
    # Zones (SCM API uses from_/to_ not source_zones/dest_zones!)
    from_: List[str]                # Source zones
    to_: List[str]                  # Destination zones
    
    # Addresses
    source: List[str]               # Source addresses
    destination: List[str]          # Destination addresses
    
    # Applications and Services
    application: List[str]          # Applications to allow/deny
    service: List[str]              # Services (usually ['application-default'])
    
    # Action
    action: str                     # "allow", "deny", or "drop"
    
    # Optional fields
    description: Optional[str]      # Rule description
    tag: Optional[List[str]]        # Tags for organization
    
    # Boolean fields
    validated: bool                 # Has rule been validated?
    log_end: Optional[bool]         # Log at session end?
    
    # Nested configuration
    profile_setting: Optional[dict] # Security profiles: {"group": ["best-practice"]}
    
    # Workflow tracking
    result: str                     # Validation/processing result
    errors: List[str]               # List of validation errors

print("‚úÖ SecurityRuleState defined - matches pan-scm-sdk structure!")
print("\nState structure (matching docs/examples/security_policy.py):")
print("  - name: str (rule name)")
print("  - folder: str (SCM folder)")
print("  - from_: List[str] (source zones)")
print("  - to_: List[str] (destination zones)")
print("  - source: List[str] (source addresses)")
print("  - destination: List[str] (destination addresses)")
print("  - application: List[str] (applications)")
print("  - service: List[str] (services)")
print("  - action: str ('allow', 'deny', 'drop')")
print("  - description: Optional[str]")
print("  - tag: Optional[List[str]]")
print("  - validated: bool")
print("  - log_end: Optional[bool]")
print("  - profile_setting: Optional[dict]")
print("  - result: str")
print("  - errors: List[str]")
print("\nüí° Now handling SEVEN different data types:")
print("   1. String: name, folder, action, description, result")
print("   2. List[str]: from_, to_, source, destination, application, service, tag, errors")
print("   3. Boolean: validated, log_end")
print("   4. Optional: description, tag, log_end, profile_setting")
print("   5. Dictionary: profile_setting")
print("\nüìö This matches the structure from docs/examples/security_policy.py!")

### 7.3 Create Processing Node

Now let's build a node that processes list data. This node will:
1. Read the hostname and interface metrics from state
2. Calculate the sum of all interface utilization values
3. Format a nice summary report
4. Return the updated result

**Key Pattern**: We're building healthy habits by including docstrings!

In [None]:
def validate_security_rule(state: SecurityRuleState) -> dict:
    """Node: Validate SCM security rule configuration.

    This node demonstrates comprehensive validation of all field types:
    - Required string fields (name, folder, action)
    - Required list fields (from_, to_, source, destination, application, service)
    - Optional fields (description, tag, log_end, profile_setting)
    - Boolean logic (validated flag)
    
    References:
    - docs/examples/security_policy.py lines 9-20 (basic rule structure)
    - docs/examples/security_policy.py lines 26-39 (rule with profiles)

    Args:
        state: Current state with security rule information

    Returns:
        dict: Partial state update with validation results
    """
    errors = []
    
    # Validate required string fields
    if not state.get("name"):
        errors.append("Rule name is required")
    if not state.get("folder"):
        errors.append("Folder is required")
    
    # Validate action is one of allowed values
    valid_actions = ["allow", "deny", "drop"]
    if state.get("action") not in valid_actions:
        errors.append(f"Action must be one of: {', '.join(valid_actions)}")
    
    # Validate required list fields aren't empty
    if not state.get("from_"):
        errors.append("At least one source zone (from_) is required")
    if not state.get("to_"):
        errors.append("At least one destination zone (to_) is required")
    if not state.get("source"):
        errors.append("At least one source address is required")
    if not state.get("destination"):
        errors.append("At least one destination address is required")
    if not state.get("application"):
        errors.append("At least one application is required")
    if not state.get("service"):
        errors.append("At least one service is required")
    
    # Count rule components (using list lengths)
    zone_count = len(state.get("from_", [])) + len(state.get("to_", []))
    address_count = len(state.get("source", [])) + len(state.get("destination", []))
    app_count = len(state.get("application", []))
    
    # Check optional fields safely
    has_description = bool(state.get("description"))
    has_tags = bool(state.get("tag"))
    has_profiles = bool(state.get("profile_setting"))
    logging_enabled = state.get("log_end", False)
    
    # Generate validation result
    if errors:
        result = f"‚ùå Rule '{state.get('name', 'UNNAMED')}' validation FAILED:\n" + \
                 "\n".join(f"  - {error}" for error in errors)
        validated = False
    else:
        result = f"‚úÖ Rule '{state['name']}' validation PASSED:\n" + \
                 f"  Zones: {zone_count} references (from: {len(state['from_'])}, to: {len(state['to_'])})\n" + \
                 f"  Addresses: {address_count} references\n" + \
                 f"  Applications: {app_count}\n" + \
                 f"  Action: {state['action']}\n" + \
                 f"  Description: {'‚úì' if has_description else '‚úó'}\n" + \
                 f"  Tags: {'‚úì' if has_tags else '‚úó'}\n" + \
                 f"  Security Profiles: {'‚úì' if has_profiles else '‚úó'}\n" + \
                 f"  Logging: {'enabled' if logging_enabled else 'disabled'}"
        validated = True
    
    # Return partial state update
    return {
        "validated": validated,
        "result": result,
        "errors": errors
    }

print("‚úÖ validate_security_rule function defined!")
print("\nüí° This node demonstrates:")
print("   1. Required field validation (name, folder, action)")
print("   2. List field validation (from_, to_, source, destination)")
print("   3. Optional field checking (.get() pattern)")
print("   4. Boolean field updates (validated)")
print("   5. Action validation against allowed values")
print("   6. Comprehensive error tracking")

### 7.4 Build and Compile the Graph

This should look familiar! We follow the same pattern as before:
1. Create the graph with our state schema
2. Add the node
3. Set entry and finish points
4. Compile

In [None]:
# Create new graph with SecurityRuleState schema
security_rule_graph = StateGraph(SecurityRuleState)

# Add the validation node
security_rule_graph.add_node("validate", validate_security_rule)

# Set entry and finish points
security_rule_graph.set_entry_point("validate")
security_rule_graph.set_finish_point("validate")

# Compile the graph
security_app = security_rule_graph.compile()

print("‚úÖ Security rule validation graph built and compiled!")

### 7.5 Visualize the Graph

Let's see what our graph looks like:

In [None]:
# Visualize the security rule validation graph
display(Image(security_app.get_graph().draw_mermaid_png()))

### 7.6 Invoke the Graph

Now let's run it! We'll pass in:
- A firewall hostname
- A list of interface utilization percentages

**Important Common Mistake**: Make sure you invoke the **compiled graph** (metrics_app), not the graph builder (metrics_graph)!

In [None]:
# Test 1: Complete valid security rule (matches docs/examples/security_policy.py lines 9-20)
print("="*70)
print("TEST 1: Complete Valid Security Rule")
print("="*70)

result1 = security_app.invoke({
    "name": "Allow-Web-Traffic",
    "folder": "Texas",
    "from_": ["trust"],
    "to_": ["untrust"],
    "source": ["internal-net"],
    "destination": ["any"],
    "application": ["web-browsing", "ssl"],
    "service": ["application-default"],
    "action": "allow",
    "description": "Allow internal users to browse the web",
    "tag": ["Production", "Web"],
    "validated": False,
    "log_end": True,
    "profile_setting": None,
    "result": "",
    "errors": []
})

print(result1["result"])
print(f"\nValidated: {result1['validated']}")
print(f"Errors: {result1['errors']}")

# Test 2: Rule with security profiles (matches docs/examples/security_policy.py lines 26-39)
print("\n" + "="*70)
print("TEST 2: Rule with Security Profiles")
print("="*70)

result2 = security_app.invoke({
    "name": "Secure-Web-Access",
    "folder": "Texas",
    "from_": ["trust"],
    "to_": ["untrust"],
    "source": ["internal-net"],
    "destination": ["any"],
    "application": ["web-browsing", "ssl"],
    "service": ["application-default"],
    "action": "allow",
    "description": "Web access with security profiles",
    "tag": ["Production", "Security"],
    "validated": False,
    "log_end": True,
    "profile_setting": {"group": ["best-practice"]},  # Nested dictionary!
    "result": "",
    "errors": []
})

print(result2["result"])
print(f"\nProfile Setting: {result2['profile_setting']}")

# Test 3: Invalid rule (missing required fields)
print("\n" + "="*70)
print("TEST 3: Invalid Rule (Missing Required Fields)")
print("="*70)

result3 = security_app.invoke({
    "name": "",  # Missing name!
    "folder": "Texas",
    "from_": [],  # Empty zones!
    "to_": ["untrust"],
    "source": ["any"],
    "destination": ["any"],
    "application": [],  # Empty applications!
    "service": ["application-default"],
    "action": "invalid_action",  # Invalid action!
    "description": None,
    "tag": None,
    "validated": False,
    "log_end": False,
    "profile_setting": None,
    "result": "",
    "errors": []
})

print(result3["result"])
print(f"\nValidated: {result3['validated']}")
print(f"Errors found: {len(result3['errors'])}")
for error in result3['errors']:
    print(f"  - {error}")

print("\n" + "="*70)
print("‚úÖ All Tests Complete!")
print("="*70)

print("\nüí° Key Observations:")
print("   1. ‚úÖ Test 1: All required fields provided ‚Üí validation passed")
print("   2. ‚úÖ Test 2: Optional profile_setting (nested dict) handled correctly")
print("   3. ‚ùå Test 3: Missing/invalid fields caught by validation")
print("\nüìö These examples match docs/examples/security_policy.py structure!")

---

### 7.7 Working with Different Field Types

Now let's explore how to work with each data type in detail. Understanding these patterns is critical for production SCM automation!

#### 7.7.1 String Fields (Required vs Optional)

String fields can be required or optional. Here's how to handle both:

In [None]:
# Example: Working with Optional string fields

def process_optional_fields(state: SecurityRuleState) -> dict:
    """Demonstrate safe handling of Optional string fields."""
    
    # ‚ùå UNSAFE - might crash if description is None
    # result = f"Description length: {len(state['description'])}"
    
    # ‚úÖ SAFE - use .get() with default value
    description = state.get("description", "No description provided")
    result = f"Description: {description}"
    
    # ‚úÖ SAFE - check before accessing
    if state.get("description"):
        result += f"\n  Length: {len(state['description'])} characters"
    else:
        result += "\n  (No description)"
    
    return {"result": result}

# Test with description
test_state = {
    "name": "test-rule",
    "folder": "Texas",
    "from_": ["trust"],
    "to_": ["untrust"],
    "source": ["any"],
    "destination": ["any"],
    "application": ["web-browsing"],
    "service": ["application-default"],
    "action": "allow",
    "description": "This is a test rule",  # Has description
    "tag": None,
    "validated": False,
    "log_end": False,
    "profile_setting": None,
    "result": "",
    "errors": []
}

result = process_optional_fields(test_state)
print("With description:")
print(result["result"])

# Test without description
test_state["description"] = None
result = process_optional_fields(test_state)
print("\nWithout description:")
print(result["result"])

print("\nüí° Key Pattern: Always use .get() for Optional fields!")

---

## Summary

In this notebook, you learned how to work with complex state structures in LangGraph. The key takeaways:

1. **Multi-field state schemas** enable tracking multiple related pieces of information
2. **Type diversity** - State can include strings, lists, integers, booleans, and dictionaries
3. **Safe access patterns** - Always check if optional fields exist before accessing them
4. **State initialization** - Fields only exist after they've been set by a node
5. **Real-world applications** - Complex state structures enable sophisticated network automation workflows

These concepts form the foundation for building production-ready LangGraph applications!
