# 104 LangGraph: State Management - Working with Complex Data

**Duration:** ~35 minutes  
**Difficulty:** Beginner  
**Prerequisites:** Notebook 103

## Learning Objectives

By the end of this notebook, you will be able to:

1. **Design multi-field state schemas with TypedDict** - Create complex state structures that track multiple pieces of information
2. **Work with diverse data types** - Handle strings, lists, integers, booleans, and dictionaries within state
3. **Implement safe state access patterns** - Understand how to safely access and initialize state fields
4. **Handle optional fields and defaults** - Manage optional state fields and provide sensible defaults
5. **Build complex SCM object structures** - Apply state management to real-world Palo Alto Networks configurations


## Prerequisites

Before starting this notebook, you should have:

- Completed **Notebook 103: State Schemas with TypedDict**
- Understanding of Python type hints and TypedDict
- Basic familiarity with LangGraph state management
- Python 3.11+ environment with required packages installed


## Table of Contents

1. [What You Learned in Notebook 103](#what-you-learned)
2. [Handling Multiple Inputs and Data Types](#multiple-inputs)
3. [Working with Lists and Complex Types](#complex-types)
4. [Understanding State Initialization](#state-initialization)
5. [Exercise: SCM Address Object Type Operations](#exercise)
6. [Real-World Example: Complete SCM Configuration Workflow](#real-world-example)
7. [What's Next](#whats-next)


<a id='what-you-learned'></a>

## What You Learned in Notebook 103

In the previous notebook, you learned:

- **TypedDict Basics** - How to define type-safe state schemas using TypedDict
- **Single Field States** - Working with simple state structures containing one or two fields
- **Type Annotations** - Using Python type hints (str, int, list) to define field types
- **State Updates** - How nodes return dictionaries that update the state
- **Basic Reducers** - Introduction to the Annotated type and simple reducers

Now we'll build on these concepts by working with **more complex state structures** that include multiple fields and diverse data types.


## Environment Setup

Let's start by importing all the necessary libraries and setting up our environment.


In [None]:
# Core LangGraph imports
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, List, Optional
from IPython.display import Image, display

print("Environment setup complete!")


<a id='multiple-inputs'></a>

---

## Handling Multiple Inputs and Data Types

Great work so far! You've learned the fundamentals of LangGraph state management. Now we're going to level up by working with **more complex state structures** that include:

- Multiple fields tracking different aspects of your workflow
- Different data types: strings, lists, integers, booleans
- Real-world scenarios where you need to track relationships between data

This is where LangGraph really shines for network automation - you can track rule names, zones, applications, and validation status all in one cohesive state structure.


<a id='complex-types'></a>

### 7.1 Import Additional Type

We need to import `List` from typing to define list fields in our state (for things like zone lists, tag lists, etc.):

In [None]:
# Import List for defining list types
from typing import List

print("‚úÖ List type imported!")
print("\nüí° We can now define fields like: interface_metrics: List[int]")

### üí° Looking Ahead: Reducers for List Fields

In more advanced LangGraph (Notebooks 110-111 with AI agents), you can use **reducers** to control how list fields are updated automatically:

```python
from typing import Annotated
from operator import add

class RuleState(TypedDict):
    name: str
    errors: Annotated[List[str], add]  # Auto-append, never replace!
```

**What does this do?**

Without reducer:
```python
# Manual approach - you must handle the append logic
def my_node(state: RuleState) -> dict:
    current_errors = state.get("errors", [])
    new_errors = current_errors + ["new error"]
    return {"errors": new_errors}  # Must manually append
```

With reducer:
```python
# With Annotated[List[str], add] - automatic append!
def my_node(state: RuleState) -> dict:
    return {"errors": ["new error"]}  # Automatically appends to existing list!
```

**Why is this powerful?**

- ‚úÖ **Simplifies node logic**: Nodes just return new items, LangGraph handles appending
- ‚úÖ **Prevents bugs**: Can't accidentally replace the entire list
- ‚úÖ **Perfect for errors**: Error messages accumulate across nodes automatically
- ‚úÖ **Essential for AI agents**: Message history uses `add_messages` reducer (Notebook 110)

**When you'll use this:**

- Notebook 110: `add_messages` reducer for conversation history
- Notebook 111: Custom reducers for complex state updates
- Production: Any time you want automatic list accumulation

For now, we'll use manual append (`current + new`) to understand the mechanics. You'll graduate to reducers when building AI agents!

üí° **Remember**: Reducers are syntactic sugar - they make code cleaner but the pattern is the same!

### 7.2 Define Enhanced State

Now let's create a more complex state that handles multiple data types.

**Important naming note**: You can name your state class anything you want! It could be `BottleState`, `MyWorkflowState`, or anything else. We use `FirewallMetricsState` because it clearly describes what the state represents.

Our new state will track:
- `hostname` (str): The firewall device name
- `interface_metrics` (List[int]): List of interface utilization percentages
- `result` (str): The computed summary report

In [None]:
class SecurityRuleState(TypedDict):
    """State for SCM security rule - matches docs/examples/security_policy.py structure.
    
    References:
    - docs/examples/security_policy.py lines 9-20 (basic rule)
    - docs/examples/security_policy.py lines 26-39 (rule with profiles)
    """
    # Basic identification
    name: str                       # Security rule name
    folder: str                     # SCM folder (e.g., 'Texas')
    
    # Zones (SCM API uses from_/to_ not source_zones/dest_zones!)
    from_: List[str]                # Source zones
    to_: List[str]                  # Destination zones
    
    # Addresses
    source: List[str]               # Source addresses
    destination: List[str]          # Destination addresses
    
    # Applications and Services
    application: List[str]          # Applications to allow/deny
    service: List[str]              # Services (usually ['application-default'])
    
    # Action
    action: str                     # "allow", "deny", or "drop"
    
    # Optional fields
    description: Optional[str]      # Rule description
    tag: Optional[List[str]]        # Tags for organization
    
    # Boolean fields
    validated: bool                 # Has rule been validated?
    log_end: Optional[bool]         # Log at session end?
    
    # Nested configuration
    profile_setting: Optional[dict] # Security profiles: {"group": ["best-practice"]}
    
    # Workflow tracking
    result: str                     # Validation/processing result
    errors: List[str]               # List of validation errors

print("‚úÖ SecurityRuleState defined - matches pan-scm-sdk structure!")
print("\nState structure (matching docs/examples/security_policy.py):")
print("  - name: str (rule name)")
print("  - folder: str (SCM folder)")
print("  - from_: List[str] (source zones)")
print("  - to_: List[str] (destination zones)")
print("  - source: List[str] (source addresses)")
print("  - destination: List[str] (destination addresses)")
print("  - application: List[str] (applications)")
print("  - service: List[str] (services)")
print("  - action: str ('allow', 'deny', 'drop')")
print("  - description: Optional[str]")
print("  - tag: Optional[List[str]]")
print("  - validated: bool")
print("  - log_end: Optional[bool]")
print("  - profile_setting: Optional[dict]")
print("  - result: str")
print("  - errors: List[str]")
print("\nüí° Now handling SEVEN different data types:")
print("   1. String: name, folder, action, description, result")
print("   2. List[str]: from_, to_, source, destination, application, service, tag, errors")
print("   3. Boolean: validated, log_end")
print("   4. Optional: description, tag, log_end, profile_setting")
print("   5. Dictionary: profile_setting")
print("\nüìö This matches the structure from docs/examples/security_policy.py!")

### 7.3 Create Processing Node

Now let's build a node that processes list data. This node will:
1. Read the hostname and interface metrics from state
2. Calculate the sum of all interface utilization values
3. Format a nice summary report
4. Return the updated result

**Key Pattern**: We're building healthy habits by including docstrings!

In [None]:
def validate_security_rule(state: SecurityRuleState) -> dict:
    """Node: Validate SCM security rule configuration.

    This node demonstrates comprehensive validation of all field types:
    - Required string fields (name, folder, action)
    - Required list fields (from_, to_, source, destination, application, service)
    - Optional fields (description, tag, log_end, profile_setting)
    - Boolean logic (validated flag)
    
    References:
    - docs/examples/security_policy.py lines 9-20 (basic rule structure)
    - docs/examples/security_policy.py lines 26-39 (rule with profiles)

    Args:
        state: Current state with security rule information

    Returns:
        dict: Partial state update with validation results
    """
    errors = []
    
    # Validate required string fields
    if not state.get("name"):
        errors.append("Rule name is required")
    if not state.get("folder"):
        errors.append("Folder is required")
    
    # Validate action is one of allowed values
    valid_actions = ["allow", "deny", "drop"]
    if state.get("action") not in valid_actions:
        errors.append(f"Action must be one of: {', '.join(valid_actions)}")
    
    # Validate required list fields aren't empty
    if not state.get("from_"):
        errors.append("At least one source zone (from_) is required")
    if not state.get("to_"):
        errors.append("At least one destination zone (to_) is required")
    if not state.get("source"):
        errors.append("At least one source address is required")
    if not state.get("destination"):
        errors.append("At least one destination address is required")
    if not state.get("application"):
        errors.append("At least one application is required")
    if not state.get("service"):
        errors.append("At least one service is required")
    
    # Count rule components (using list lengths)
    zone_count = len(state.get("from_", [])) + len(state.get("to_", []))
    address_count = len(state.get("source", [])) + len(state.get("destination", []))
    app_count = len(state.get("application", []))
    
    # Check optional fields safely
    has_description = bool(state.get("description"))
    has_tags = bool(state.get("tag"))
    has_profiles = bool(state.get("profile_setting"))
    logging_enabled = state.get("log_end", False)
    
    # Generate validation result
    if errors:
        result = f"‚ùå Rule '{state.get('name', 'UNNAMED')}' validation FAILED:\n" + \
                 "\n".join(f"  - {error}" for error in errors)
        validated = False
    else:
        result = f"‚úÖ Rule '{state['name']}' validation PASSED:\n" + \
                 f"  Zones: {zone_count} references (from: {len(state['from_'])}, to: {len(state['to_'])})\n" + \
                 f"  Addresses: {address_count} references\n" + \
                 f"  Applications: {app_count}\n" + \
                 f"  Action: {state['action']}\n" + \
                 f"  Description: {'‚úì' if has_description else '‚úó'}\n" + \
                 f"  Tags: {'‚úì' if has_tags else '‚úó'}\n" + \
                 f"  Security Profiles: {'‚úì' if has_profiles else '‚úó'}\n" + \
                 f"  Logging: {'enabled' if logging_enabled else 'disabled'}"
        validated = True
    
    # Return partial state update
    return {
        "validated": validated,
        "result": result,
        "errors": errors
    }

print("‚úÖ validate_security_rule function defined!")
print("\nüí° This node demonstrates:")
print("   1. Required field validation (name, folder, action)")
print("   2. List field validation (from_, to_, source, destination)")
print("   3. Optional field checking (.get() pattern)")
print("   4. Boolean field updates (validated)")
print("   5. Action validation against allowed values")
print("   6. Comprehensive error tracking")

### 7.4 Build and Compile the Graph

This should look familiar! We follow the same pattern as before:
1. Create the graph with our state schema
2. Add the node
3. Set entry and finish points
4. Compile

In [None]:
# Create new graph with SecurityRuleState schema
security_rule_graph = StateGraph(SecurityRuleState)

# Add the validation node
security_rule_graph.add_node("validate", validate_security_rule)

# Set entry and finish points
security_rule_graph.set_entry_point("validate")
security_rule_graph.set_finish_point("validate")

# Compile the graph
security_app = security_rule_graph.compile()

print("‚úÖ Security rule validation graph built and compiled!")

### 7.5 Visualize the Graph

Let's see what our graph looks like:

In [None]:
# Visualize the security rule validation graph
display(Image(security_app.get_graph().draw_mermaid_png()))

### 7.6 Invoke the Graph

Now let's run it! We'll pass in:
- A firewall hostname
- A list of interface utilization percentages

**Important Common Mistake**: Make sure you invoke the **compiled graph** (metrics_app), not the graph builder (metrics_graph)!

In [None]:
# Test 1: Complete valid security rule (matches docs/examples/security_policy.py lines 9-20)
print("="*70)
print("TEST 1: Complete Valid Security Rule")
print("="*70)

result1 = security_app.invoke({
    "name": "Allow-Web-Traffic",
    "folder": "Texas",
    "from_": ["trust"],
    "to_": ["untrust"],
    "source": ["internal-net"],
    "destination": ["any"],
    "application": ["web-browsing", "ssl"],
    "service": ["application-default"],
    "action": "allow",
    "description": "Allow internal users to browse the web",
    "tag": ["Production", "Web"],
    "validated": False,
    "log_end": True,
    "profile_setting": None,
    "result": "",
    "errors": []
})

print(result1["result"])
print(f"\nValidated: {result1['validated']}")
print(f"Errors: {result1['errors']}")

# Test 2: Rule with security profiles (matches docs/examples/security_policy.py lines 26-39)
print("\n" + "="*70)
print("TEST 2: Rule with Security Profiles")
print("="*70)

result2 = security_app.invoke({
    "name": "Secure-Web-Access",
    "folder": "Texas",
    "from_": ["trust"],
    "to_": ["untrust"],
    "source": ["internal-net"],
    "destination": ["any"],
    "application": ["web-browsing", "ssl"],
    "service": ["application-default"],
    "action": "allow",
    "description": "Web access with security profiles",
    "tag": ["Production", "Security"],
    "validated": False,
    "log_end": True,
    "profile_setting": {"group": ["best-practice"]},  # Nested dictionary!
    "result": "",
    "errors": []
})

print(result2["result"])
print(f"\nProfile Setting: {result2['profile_setting']}")

# Test 3: Invalid rule (missing required fields)
print("\n" + "="*70)
print("TEST 3: Invalid Rule (Missing Required Fields)")
print("="*70)

result3 = security_app.invoke({
    "name": "",  # Missing name!
    "folder": "Texas",
    "from_": [],  # Empty zones!
    "to_": ["untrust"],
    "source": ["any"],
    "destination": ["any"],
    "application": [],  # Empty applications!
    "service": ["application-default"],
    "action": "invalid_action",  # Invalid action!
    "description": None,
    "tag": None,
    "validated": False,
    "log_end": False,
    "profile_setting": None,
    "result": "",
    "errors": []
})

print(result3["result"])
print(f"\nValidated: {result3['validated']}")
print(f"Errors found: {len(result3['errors'])}")
for error in result3['errors']:
    print(f"  - {error}")

print("\n" + "="*70)
print("‚úÖ All Tests Complete!")
print("="*70)

print("\nüí° Key Observations:")
print("   1. ‚úÖ Test 1: All required fields provided ‚Üí validation passed")
print("   2. ‚úÖ Test 2: Optional profile_setting (nested dict) handled correctly")
print("   3. ‚ùå Test 3: Missing/invalid fields caught by validation")
print("\nüìö These examples match docs/examples/security_policy.py structure!")

### 7.7 Accessing Only the Result

If you only want the computed result (not all the fields), you can access it directly:

In [None]:
# Access just the result field from the last test
print("Clean output - accessing just the result:")
print(result3["result"])

print("\nüí° This is cleaner when you only need the computed result!")
print("üí° You can access any field from the final state: result3['validated'], result3['errors'], etc.")

---

### 7.7 Working with Different Field Types

Now let's explore how to work with each data type in detail. Understanding these patterns is critical for production SCM automation!

#### 7.7.1 String Fields (Required vs Optional)

String fields can be required or optional. Here's how to handle both:

In [None]:
# Example: Working with Optional string fields

def process_optional_fields(state: SecurityRuleState) -> dict:
    """Demonstrate safe handling of Optional string fields."""
    
    # ‚ùå UNSAFE - might crash if description is None
    # result = f"Description length: {len(state['description'])}"
    
    # ‚úÖ SAFE - use .get() with default value
    description = state.get("description", "No description provided")
    result = f"Description: {description}"
    
    # ‚úÖ SAFE - check before accessing
    if state.get("description"):
        result += f"\n  Length: {len(state['description'])} characters"
    else:
        result += "\n  (No description)"
    
    return {"result": result}

# Test with description
test_state = {
    "name": "test-rule",
    "folder": "Texas",
    "from_": ["trust"],
    "to_": ["untrust"],
    "source": ["any"],
    "destination": ["any"],
    "application": ["web-browsing"],
    "service": ["application-default"],
    "action": "allow",
    "description": "This is a test rule",  # Has description
    "tag": None,
    "validated": False,
    "log_end": False,
    "profile_setting": None,
    "result": "",
    "errors": []
}

result = process_optional_fields(test_state)
print("With description:")
print(result["result"])

# Test without description
test_state["description"] = None
result = process_optional_fields(test_state)
print("\nWithout description:")
print(result["result"])

print("\nüí° Key Pattern: Always use .get() for Optional fields!")

#### 7.7.2 Boolean Fields for Workflow Control

Boolean fields are essential for tracking workflow state (validated, created, enabled, etc.):

In [None]:
# Example: Using Boolean fields for workflow control

def check_rule_status(state: SecurityRuleState) -> dict:
    """Demonstrate using Boolean fields to control workflow logic."""
    
    # Boolean field: validated
    if state["validated"]:
        status = "‚úÖ Rule has been validated and is ready for deployment"
    else:
        status = "‚ö†Ô∏è  Rule needs validation before deployment"
    
    # Optional Boolean field: log_end (with safe access)
    logging_status = state.get("log_end", False)
    if logging_status:
        status += "\n  üìä Session-end logging is ENABLED"
    else:
        status += "\n  üìä Session-end logging is DISABLED"
    
    # Business logic based on boolean values
    can_deploy = state["validated"] and not state.get("errors", [])
    
    if can_deploy:
        status += "\n  üöÄ READY TO DEPLOY"
    else:
        status += "\n  üõë NOT READY - fix errors first"
    
    return {"result": status}

# Test 1: Validated rule with logging
print("="*60)
print("TEST 1: Validated rule with logging enabled")
print("="*60)
test1 = {
    "name": "web-rule",
    "folder": "Texas",
    "from_": ["trust"],
    "to_": ["untrust"],
    "source": ["any"],
    "destination": ["any"],
    "application": ["web-browsing"],
    "service": ["application-default"],
    "action": "allow",
    "description": None,
    "tag": None,
    "validated": True,  # ‚Üê Boolean: True
    "log_end": True,    # ‚Üê Optional Boolean: True
    "profile_setting": None,
    "result": "",
    "errors": []
}

result1 = check_rule_status(test1)
print(result1["result"])

# Test 2: Unvalidated rule
print("\n" + "="*60)
print("TEST 2: Unvalidated rule")
print("="*60)
test2 = test1.copy()
test2["validated"] = False  # ‚Üê Boolean: False
test2["log_end"] = False    # ‚Üê Optional Boolean: False

result2 = check_rule_status(test2)
print(result2["result"])

print("\nüí° Boolean fields enable workflow control:")
print("   - Track validation status")
print("   - Control deployment readiness")
print("   - Configure logging options")
print("   - Enable/disable features")

#### 7.7.3 Dictionary Fields for Nested Configuration

Dictionary fields handle complex nested configurations like security profiles:

In [None]:
# Example: Working with nested dictionary fields

def process_security_profiles(state: SecurityRuleState) -> dict:
    """Demonstrate accessing and updating nested dictionary configurations.
    
    References docs/examples/security_policy.py lines 35-36:
        "profile_setting": {"group": ["best-practice"]}
    """
    
    # ‚ùå UNSAFE - might crash if profile_setting is None
    # groups = state["profile_setting"]["group"]
    
    # ‚úÖ SAFE - use .get() with default empty dict
    profile_setting = state.get("profile_setting", {})
    
    if profile_setting:
        # Access nested values safely
        groups = profile_setting.get("group", [])
        
        result = f"Security Profiles Applied:\n"
        result += f"  Profile Type: Security Group\n"
        result += f"  Groups: {', '.join(groups)}\n"
        result += f"  Count: {len(groups)} profile(s)"
        
        # Check specific profiles
        if "best-practice" in groups:
            result += "\n  ‚úÖ Best-practice security enabled"
        if "strict-security" in groups:
            result += "\n  ‚úÖ Strict security enabled"
    else:
        result = "No security profiles configured\n"
        result += "  ‚ö†Ô∏è  Consider adding security profiles for enhanced protection"
    
    return {"result": result}

# Test 1: Rule with security profiles (matches docs/examples/security_policy.py)
print("="*70)
print("TEST 1: Rule with Best-Practice Security Profile")
print("="*70)
test1 = {
    "name": "secure-web",
    "folder": "Texas",
    "from_": ["trust"],
    "to_": ["untrust"],
    "source": ["internal-net"],
    "destination": ["any"],
    "application": ["web-browsing", "ssl"],
    "service": ["application-default"],
    "action": "allow",
    "description": "Secure web access with profiles",
    "tag": ["Production", "Security"],
    "validated": True,
    "log_end": True,
    "profile_setting": {"group": ["best-practice"]},  # ‚Üê Nested dictionary!
    "result": "",
    "errors": []
}

result1 = process_security_profiles(test1)
print(result1["result"])

# Test 2: Rule with multiple security profiles
print("\n" + "="*70)
print("TEST 2: Rule with Multiple Security Profiles")
print("="*70)
test2 = test1.copy()
test2["profile_setting"] = {"group": ["best-practice", "strict-security"]}

result2 = process_security_profiles(test2)
print(result2["result"])

# Test 3: Rule without security profiles
print("\n" + "="*70)
print("TEST 3: Rule Without Security Profiles")
print("="*70)
test3 = test1.copy()
test3["profile_setting"] = None  # ‚Üê No profiles

result3 = process_security_profiles(test3)
print(result3["result"])

print("\nüí° Dictionary fields pattern:")
print("   1. Use .get() with default {} to avoid None errors")
print("   2. Access nested values with .get() again")
print("   3. Check if dictionary exists before accessing")
print("\nüìö This matches docs/examples/security_policy.py lines 35-36!")

#### 7.7.4 List Field Operations (Append, Replace, Filter)

List fields are common in SCM configurations. Here's how to manipulate them:

In [None]:
# Example: List field operations (append, replace, filter)

def modify_applications(state: SecurityRuleState) -> dict:
    """Demonstrate three common list operations."""
    
    current_apps = state.get("application", [])
    
    print(f"Original applications: {current_apps}")
    
    # Operation 1: APPEND - Add new applications to existing list
    append_apps = current_apps + ["http2", "ssl"]
    print(f"\n1. APPEND (add http2, ssl):")
    print(f"   Result: {append_apps}")
    
    # Operation 2: REPLACE - Completely replace the list
    replace_apps = ["any"]
    print(f"\n2. REPLACE (with 'any'):")
    print(f"   Result: {replace_apps}")
    
    # Operation 3: FILTER - Remove specific applications
    # Remove 'ssl' if it exists
    filter_apps = [app for app in current_apps if app != "ssl"]
    print(f"\n3. FILTER (remove 'ssl'):")
    print(f"   Result: {filter_apps}")
    
    # Operation 4: DEDUPLICATE - Remove duplicates
    apps_with_dupes = current_apps + ["web-browsing", "ssl"]  # Add duplicates
    dedup_apps = list(set(apps_with_dupes))
    print(f"\n4. DEDUPLICATE:")
    print(f"   With dupes: {apps_with_dupes}")
    print(f"   After dedup: {dedup_apps}")
    
    # For this demo, return the appended version
    return {"application": append_apps}

# Test list operations
print("="*70)
print("LIST OPERATIONS DEMO")
print("="*70)

test_state = {
    "name": "web-rule",
    "folder": "Texas",
    "from_": ["trust"],
    "to_": ["untrust"],
    "source": ["any"],
    "destination": ["any"],
    "application": ["web-browsing", "ssl"],  # Starting list
    "service": ["application-default"],
    "action": "allow",
    "description": None,
    "tag": None,
    "validated": False,
    "log_end": False,
    "profile_setting": None,
    "result": "",
    "errors": []
}

result = modify_applications(test_state)

print("\n" + "="*70)
print("üí° List Operation Patterns:")
print("="*70)
print("   APPEND:      existing_list + [new_items]")
print("   REPLACE:     new_list")
print("   FILTER:      [item for item in list if condition]")
print("   DEDUPLICATE: list(set(list_with_dupes))")
print("\n   These patterns work for:")
print("   - from_ / to_ zones")
print("   - source / destination addresses")
print("   - application lists")
print("   - service lists")
print("   - tag lists")

#### 7.7.5 Error Handling with Try-Catch

Error handling is essential for production workflows. Let's see how to use try-catch with state-based error tracking:

In [None]:
# Example: Error handling with try-catch for SCM operations

def create_security_rule_with_error_handling(state: SecurityRuleState) -> dict:
    """Demonstrate production error handling pattern for SCM operations.
    
    This shows how you would handle SCM exceptions in production.
    (Not executed - demonstrates the pattern for Notebooks 105-107)
    """
    errors = []
    
    try:
        # Simulate validation
        rule_name = state["name"]
        
        if not rule_name:
            raise ValueError("Rule name cannot be empty")
        
        if len(rule_name) > 63:
            raise ValueError("Rule name must be 63 characters or less")
        
        # In production, this would be:
        # from scm.exceptions import NameNotUniqueError, InvalidObjectError
        # 
        # try:
        #     rule = client.security_rule.create({
        #         "name": state["name"],
        #         "from_": state["from_"],
        #         "to_": state["to_"],
        #         "source": state["source"],
        #         "destination": state["destination"],
        #         "application": state["application"],
        #         "service": state["service"],
        #         "action": state["action"],
        #         "folder": state["folder"]
        #     })
        #     result = f"‚úÖ Rule created successfully (ID: {rule.id})"
        #     validated = True
        # 
        # except NameNotUniqueError:
        #     errors.append(f"Rule name '{state['name']}' already exists")
        #     result = "‚ùå Creation failed - duplicate name"
        #     validated = False
        # 
        # except InvalidObjectError as e:
        #     errors.append(f"Invalid configuration: {e.message}")
        #     result = "‚ùå Creation failed - invalid data"
        #     validated = False
        
        # For this demo, simulate success
        result = f"‚úÖ Rule '{rule_name}' passed validation"
        validated = True
        
    except ValueError as e:
        # Validation rule failed
        errors.append(str(e))
        result = f"‚ùå Validation failed: {str(e)}"
        validated = False
        
    except Exception as e:
        # Catch-all for unexpected errors
        errors.append(f"Unexpected error: {str(e)}")
        result = f"‚ùå Unexpected error: {type(e).__name__}"
        validated = False
    
    return {
        "result": result,
        "validated": validated,
        "errors": errors
    }


print("üí° Error Handling Pattern for Production:")
print("="*70)
print("\nKey Patterns:")
print("  1. Wrap SCM operations in try-catch blocks")
print("  2. Track errors in state (errors: List[str] field)")
print("  3. Handle specific SCM exceptions separately:")
print("     - NameNotUniqueError: Object name already exists")
print("     - InvalidObjectError: Configuration data format wrong")
print("     - ObjectNotPresentError: Object not found (404)")
print("     - MissingQueryParameterError: Required parameter missing")
print("     - ReferenceNotZeroError: Can't delete - still referenced")
print("  4. Always return valid state (even on error)")
print("  5. Use validated/errors fields to control workflow routing")
print("\nüìö This pattern will be used extensively in Notebooks 105-107!")

# Test the error handling
print("\n" + "="*70)
print("Testing Error Handling")
print("="*70)

# Test 1: Valid rule name
print("\nTest 1: Valid rule name")
test1 = {
    "name": "Allow-Web-Traffic",
    "folder": "Texas",
    "from_": ["trust"],
    "to_": ["untrust"],
    "source": ["any"],
    "destination": ["any"],
    "application": ["web-browsing"],
    "service": ["application-default"],
    "action": "allow",
    "description": None,
    "tag": None,
    "validated": False,
    "log_end": False,
    "profile_setting": None,
    "result": "",
    "errors": []
}

result1 = create_security_rule_with_error_handling(test1)
print(f"Result: {result1['result']}")
print(f"Validated: {result1['validated']}")
print(f"Errors: {result1['errors']}")

# Test 2: Empty rule name (validation error)
print("\nTest 2: Empty rule name (should fail)")
test2 = test1.copy()
test2["name"] = ""

result2 = create_security_rule_with_error_handling(test2)
print(f"Result: {result2['result']}")
print(f"Validated: {result2['validated']}")
print(f"Errors: {result2['errors']}")

# Test 3: Name too long (validation error)
print("\nTest 3: Name too long (should fail)")
test3 = test1.copy()
test3["name"] = "a" * 64  # 64 characters - exceeds 63 limit

result3 = create_security_rule_with_error_handling(test3)
print(f"Result: {result3['result']}")
print(f"Validated: {result3['validated']}")
print(f"Errors: {result3['errors']}")

print("\n" + "="*70)
print("üí° Key Takeaways:")
print("="*70)
print("  1. Try-catch ensures nodes always return valid state")
print("  2. Errors are tracked in state for later analysis")
print("  3. validated field enables conditional routing (Notebook 106)")
print("  4. Specific exception types allow targeted error handling")
print("  5. This pattern scales to production SCM operations")

<a id='state-initialization'></a>

### 7.8 Understanding State Initialization

Let's explore an important concept: **How LangGraph handles fields you don't provide as input**.

We'll add some print statements to see the state before and after processing:

In [None]:
# Create a new version with debug print statements
def validate_security_rule_debug(state: SecurityRuleState) -> dict:
    """Node: Validate security rule with debug output."""
    
    print("üì• STATE BEFORE PROCESSING:")
    print(f"   name: {state.get('name', 'NOT PROVIDED')}")
    print(f"   from_: {state.get('from_', 'NOT PROVIDED')}")
    print(f"   to_: {state.get('to_', 'NOT PROVIDED')}")
    print(f"   result: {state.get('result', 'NOT PROVIDED')}")
    print()
    
    # Read values from state (CORRECTED field names!)
    rule_name = state["name"]
    source_zones = state["from_"]
    dest_zones = state["to_"]
    
    # Calculate totals
    total_zones = len(source_zones) + len(dest_zones)
    
    # Generate report
    report = f"Rule '{rule_name}' validated: {len(source_zones)} source zones, {len(dest_zones)} destination zones (total: {total_zones} zone references)"
    
    print("üì§ RETURNING PARTIAL UPDATE:")
    print(f"   result: {report}")
    print(f"   validated: True")
    print()
    
    # Return partial update
    return {"result": report, "validated": True}

# Build new graph with debug version
debug_graph = StateGraph(SecurityRuleState)
debug_graph.add_node("validate", validate_security_rule_debug)
debug_graph.add_edge(START, "validate")
debug_graph.add_edge("validate", END)
debug_app = debug_graph.compile()

# Invoke - notice we're providing all required fields!
print("="*60)
print("INVOKING GRAPH - Providing all required SecurityRuleState fields")
print("="*60)
result = debug_app.invoke({
    "name": "DMZ-Access-Policy",
    "folder": "Texas",
    "from_": ["dmz"],
    "to_": ["trust", "untrust"],
    "source": ["any"],
    "destination": ["any"],
    "application": ["any"],
    "service": ["application-default"],
    "action": "allow",
    "description": None,
    "tag": None,
    "validated": False,
    "log_end": False,
    "profile_setting": None,
    "result": "",
    "errors": []
})

print("="*60)
print("FINAL RESULT:")
print(f"  {result['result']}")
print(f"  validated: {result['validated']}")
print("="*60)

### 7.9 Key Observation: Uninitialized Fields

Notice what happened in the output above:

**STATE BEFORE PROCESSING** shows that we didn't provide `result` as input, but LangGraph didn't crash! Instead, it handled the missing field gracefully.

‚ö†Ô∏è **Important Warning**: Be careful when using state fields in your logic!

If you try to **read** a field that wasn't provided as input, you might get unexpected behavior. For example:

```python
# ‚ùå DANGEROUS - What if 'result' wasn't provided?
def bad_node(state: FirewallMetricsState) -> dict:
    old_result = state["result"]  # This might be None or empty!
    new_result = old_result + " - Updated"  # ERROR if result is None!
    return {"result": new_result}
```

**Safe Pattern**:
```python
# ‚úÖ SAFE - Only assign to fields, don't read uninitialized ones
def good_node(state: FirewallMetricsState) -> dict:
    # Read fields you KNOW were provided
    hostname = state["hostname"]
    metrics = state["interface_metrics"]
    
    # Compute NEW value for result (don't read old value)
    result = f"Report for {hostname}: {sum(metrics)}%"
    return {"result": result}
```

üí° **Rule of Thumb**: Only read from state fields that were either:
1. Provided as initial input, OR
2. Set by a previous node in the workflow

<a id='exercise'></a>

---

## 7.10 Exercise: SCM Address Object Type Operations

Time for a more challenging exercise! This builds on everything you've learned.

**Challenge**: Create an SCM address object validator that supports different address types

**Requirements**:
- Create a state with FOUR fields:
  - `address_name` (str): Address object name
  - `address_values` (List[str]): List of IP addresses or FQDNs to validate
  - `address_type` (str): Either "ip_netmask" or "fqdn"
  - `result` (str): The validation result

- Build a node that:
  - Reads all input fields
  - If address_type is "ip_netmask", count total IP addresses
  - If address_type is "fqdn", count total FQDNs
  - Format a validation report with the result

- Use an **if statement** in your node to handle the two address types

**Example Input/Output**:

```python
# Input 1:
{
    "address_name": "internal_networks",
    "address_values": ["192.168.1.0/24", "10.0.0.0/8", "172.16.0.0/12"],
    "address_type": "ip_netmask"
}

# Output 1:
"Address object 'internal_networks' validated: 3 IP/netmask addresses"

# Input 2:
{
    "address_name": "trusted_sites",
    "address_values": ["example.com", "test.org", "demo.net"],
    "address_type": "fqdn"
}

# Output 2:
"Address object 'trusted_sites' validated: 3 FQDN addresses"
```

**Hints**:
- For count: use `len(address_values)`
- Use an if/else statement to check the address_type
- Remember to include docstrings!

**Steps**:
1. Define `AddressTypeState` with 4 fields
2. Create `validate_address_type` node function with if/else logic
3. Build graph with StateGraph
4. Add node, set entry/finish points
5. Compile
6. Visualize with IPython
7. Test with both "ip_netmask" and "fqdn" types

Try it yourself below!

In [None]:
# SOLUTION: SCM Address Object Type Operations

from typing import TypedDict, List
from langgraph.graph import StateGraph, START, END

# Step 1: Define AddressTypeState with 4 fields
class AddressTypeState(TypedDict):
    """State for validating different address object types.
    
    References:
    - docs/examples/ address object patterns
    - SCM API supports: ip_netmask, ip_range, ip_wildcard, fqdn
    """
    address_name: str                # Address object name
    address_values: List[str]        # List of IP addresses or FQDNs
    address_type: str                # "ip_netmask" or "fqdn"
    result: str                      # Validation result

print("‚úÖ AddressTypeState defined!")
print("\nState fields:")
print("  - address_name: str (name of the address object)")
print("  - address_values: List[str] (list of addresses)")
print("  - address_type: str ('ip_netmask' or 'fqdn')")
print("  - result: str (validation result)")

# Step 2: Create validation node with if/else logic
def validate_address_type(state: AddressTypeState) -> dict:
    """Node: Validate address object based on type.
    
    Supports two address types:
    - ip_netmask: IP addresses with subnet masks (e.g., "192.168.1.0/24")
    - fqdn: Fully qualified domain names (e.g., "example.com")
    
    Args:
        state: Current state with address information
        
    Returns:
        dict: Partial state update with validation result
    """
    address_name = state["address_name"]
    address_values = state["address_values"]
    address_type = state["address_type"]
    count = len(address_values)
    
    # Conditional logic based on address type
    if address_type == "ip_netmask":
        result = f"‚úÖ Address object '{address_name}' validated: {count} IP/netmask address(es)"
    elif address_type == "fqdn":
        result = f"‚úÖ Address object '{address_name}' validated: {count} FQDN address(es)"
    else:
        result = f"‚ùå Address object '{address_name}' validation failed: unknown type '{address_type}'"
    
    return {"result": result}

print("\n‚úÖ validate_address_type function defined!")
print("\nüí° This node demonstrates:")
print("   1. Reading multiple field types (str, List[str])")
print("   2. If/else conditional logic")
print("   3. Dynamic result generation based on input")
print("   4. Partial state updates (returning only 'result')")

# Step 3-6: Build graph
address_graph = StateGraph(AddressTypeState)
address_graph.add_node("validate", validate_address_type)
address_graph.add_edge(START, "validate")
address_graph.add_edge("validate", END)
address_app = address_graph.compile()

print("\n‚úÖ Address validation graph built and compiled!")

# Step 7: Visualize
from IPython.display import Image, display
print("\nüìä Graph Visualization:")
display(Image(address_app.get_graph().draw_mermaid_png()))

# Step 8: Test with "ip_netmask"
print("\n" + "="*70)
print("TEST 1: IP/Netmask Address Type")
print("="*70)
result1 = address_app.invoke({
    "address_name": "internal_networks",
    "address_values": ["192.168.1.0/24", "10.0.0.0/8", "172.16.0.0/12"],
    "address_type": "ip_netmask",
    "result": ""
})
print(result1["result"])
print(f"\nAddress count: {len(result1['address_values'])}")

# Step 9: Test with "fqdn"
print("\n" + "="*70)
print("TEST 2: FQDN Address Type")
print("="*70)
result2 = address_app.invoke({
    "address_name": "trusted_sites",
    "address_values": ["example.com", "test.org", "demo.net"],
    "address_type": "fqdn",
    "result": ""
})
print(result2["result"])
print(f"\nAddress count: {len(result2['address_values'])}")

# Step 10: Test with invalid type
print("\n" + "="*70)
print("TEST 3: Invalid Address Type (Error Handling)")
print("="*70)
result3 = address_app.invoke({
    "address_name": "invalid_test",
    "address_values": ["value1", "value2"],
    "address_type": "invalid_type",
    "result": ""
})
print(result3["result"])

print("\n" + "="*70)
print("‚úÖ Exercise Complete!")
print("="*70)
print("\nüí° Key Takeaways:")
print("   1. ‚úÖ Complex state with 4 fields (2 str, 1 List[str], 1 str result)")
print("   2. ‚úÖ Conditional logic (if/elif/else) based on address_type")
print("   3. ‚úÖ List field handling (accessing and counting)")
print("   4. ‚úÖ Error handling for invalid types")
print("   5. ‚úÖ Real-world SCM address object validation pattern")
print("\nüìö This pattern matches SCM address object validation!")
print("   Next: Try creating a NAT rule validator (Exercise 7.11)")

---

## 7.11 Advanced Exercise: NAT Policy State Validation

Now that you've completed the address type exercise, let's tackle a more complex real-world scenario: **NAT policy validation**.

**Challenge**: Create a NAT rule validator that handles different translation types

NAT rules in SCM can have multiple translation types:
- **Dynamic IP and Port (PAT)** - Multiple internal IPs share public IPs
- **Static IP** - 1:1 IP mapping
- **Destination Translation** - Port forwarding

**Requirements**:
- Create a state with these fields:
  - `name` (str): NAT rule name
  - `nat_type` (str): Either "ipv4" or "nat64"
  - `source` (List[str]): Source addresses
  - `destination` (List[str]): Destination addresses
  - `translation_type` (str): "dynamic_ip_and_port", "static_ip", or "destination"
  - `translated_addresses` (Optional[List[str]]): Public IP pool (for dynamic)
  - `static_address` (Optional[str]): 1:1 mapped IP (for static)
  - `bi_directional` (Optional[bool]): Allow inbound for static NAT
  - `result` (str): Validation result

- Build a node that:
  - Validates required fields
  - Uses if/elif/else for different translation types
  - Handles optional fields safely
  - Returns comprehensive validation results

This exercise combines everything you've learned:
- Multiple field types (str, List[str], Optional[str], Optional[bool])
- Complex conditional logic
- Safe optional field access
- Real SCM API structure (from docs/examples/nat_policy.py)

**Reference**: Check [docs/examples/nat_policy.py](../docs/examples/nat_policy.py) lines 9-24 and 31-44 for actual NAT rule structure.

Try it yourself below!

In [None]:
# SOLUTION: NAT Policy State Validation

from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph, START, END

# Step 1: Define NATRuleState with 9 fields
class NATRuleState(TypedDict):
    """State for NAT rule validation - matches docs/examples/nat_policy.py structure.
    
    References:
    - docs/examples/nat_policy.py lines 9-24 (dynamic IP and port)
    - docs/examples/nat_policy.py lines 31-44 (static IP translation)
    """
    # Basic identification
    name: str                               # NAT rule name
    nat_type: str                           # "ipv4" or "nat64"
    
    # Source and destination
    source: List[str]                       # Source addresses
    destination: List[str]                  # Destination addresses
    
    # Translation configuration
    translation_type: str                   # "dynamic_ip_and_port", "static_ip", "destination"
    
    # Optional fields (vary by translation_type)
    translated_addresses: Optional[List[str]]  # Public IP pool (for dynamic)
    static_address: Optional[str]              # 1:1 mapped IP (for static)
    bi_directional: Optional[bool]             # Allow inbound (for static)
    
    # Result
    result: str                             # Validation result

print("‚úÖ NATRuleState defined!")
print("\nState fields:")
print("  - name: str (NAT rule name)")
print("  - nat_type: str ('ipv4' or 'nat64')")
print("  - source: List[str] (source addresses)")
print("  - destination: List[str] (destination addresses)")
print("  - translation_type: str ('dynamic_ip_and_port', 'static_ip', 'destination')")
print("  - translated_addresses: Optional[List[str]] (for dynamic)")
print("  - static_address: Optional[str] (for static)")
print("  - bi_directional: Optional[bool] (for static)")
print("  - result: str (validation result)")

# Step 2: Create validation node with complex conditional logic
def validate_nat_rule(state: NATRuleState) -> dict:
    """Node: Validate NAT rule based on translation type.
    
    Handles three translation types with different requirements:
    - dynamic_ip_and_port: Requires translated_addresses (IP pool)
    - static_ip: Requires static_address, optional bi_directional
    - destination: Destination NAT/port forwarding
    
    Args:
        state: Current state with NAT rule information
        
    Returns:
        dict: Partial state update with validation result
    """
    name = state["name"]
    nat_type = state["nat_type"]
    source = state["source"]
    destination = state["destination"]
    translation_type = state["translation_type"]
    
    # Validate basic fields
    if not name:
        return {"result": "‚ùå NAT rule name is required"}
    if nat_type not in ["ipv4", "nat64"]:
        return {"result": f"‚ùå NAT type must be 'ipv4' or 'nat64', got '{nat_type}'"}
    if not source:
        return {"result": "‚ùå At least one source address is required"}
    if not destination:
        return {"result": "‚ùå At least one destination address is required"}
    
    # Conditional validation based on translation_type
    if translation_type == "dynamic_ip_and_port":
        # Dynamic IP and Port (PAT) - requires translated_addresses
        translated_addrs = state.get("translated_addresses", [])
        if not translated_addrs:
            return {"result": f"‚ùå Dynamic IP and Port translation requires translated_addresses (public IP pool)"}
        
        result = f"‚úÖ NAT Rule '{name}' (Dynamic IP and Port) validated:\n"
        result += f"  Type: {nat_type}\n"
        result += f"  Source addresses: {len(source)}\n"
        result += f"  Destination addresses: {len(destination)}\n"
        result += f"  Translation: PAT using {len(translated_addrs)} public IP(s)\n"
        result += f"  Public IPs: {', '.join(translated_addrs)}"
        
    elif translation_type == "static_ip":
        # Static IP (1:1 NAT) - requires static_address
        static_addr = state.get("static_address")
        if not static_addr:
            return {"result": f"‚ùå Static IP translation requires static_address (1:1 mapping)"}
        
        bi_dir = state.get("bi_directional", False)
        
        result = f"‚úÖ NAT Rule '{name}' (Static IP) validated:\n"
        result += f"  Type: {nat_type}\n"
        result += f"  Source addresses: {len(source)}\n"
        result += f"  Destination addresses: {len(destination)}\n"
        result += f"  Translation: 1:1 mapping to {static_addr}\n"
        result += f"  Bi-directional: {'enabled (allows inbound)' if bi_dir else 'disabled (outbound only)'}"
        
    elif translation_type == "destination":
        # Destination NAT (port forwarding)
        result = f"‚úÖ NAT Rule '{name}' (Destination NAT) validated:\n"
        result += f"  Type: {nat_type}\n"
        result += f"  Source addresses: {len(source)}\n"
        result += f"  Destination addresses: {len(destination)}\n"
        result += f"  Translation: Destination NAT (port forwarding)"
        
    else:
        return {"result": f"‚ùå Unknown translation type '{translation_type}'. Must be 'dynamic_ip_and_port', 'static_ip', or 'destination'"}
    
    return {"result": result}

print("\n‚úÖ validate_nat_rule function defined!")
print("\nüí° This node demonstrates:")
print("   1. Complex multi-field state (9 fields with 4 data types)")
print("   2. If/elif/else for multiple translation types")
print("   3. Safe handling of Optional fields based on translation_type")
print("   4. Validation that varies by configuration type")
print("   5. Real SCM NAT rule structure from docs/examples/nat_policy.py")

# Step 3-6: Build graph
nat_graph = StateGraph(NATRuleState)
nat_graph.add_node("validate", validate_nat_rule)
nat_graph.add_edge(START, "validate")
nat_graph.add_edge("validate", END)
nat_app = nat_graph.compile()

print("\n‚úÖ NAT rule validation graph built and compiled!")

# Step 7: Visualize
from IPython.display import Image, display
print("\nüìä Graph Visualization:")
display(Image(nat_app.get_graph().draw_mermaid_png()))

# Step 8: Test Dynamic IP and Port (PAT)
print("\n" + "="*70)
print("TEST 1: Dynamic IP and Port Translation (PAT)")
print("="*70)
result1 = nat_app.invoke({
    "name": "Internal-to-Internet-PAT",
    "nat_type": "ipv4",
    "source": ["192.168.0.0/16", "10.0.0.0/8"],
    "destination": ["any"],
    "translation_type": "dynamic_ip_and_port",
    "translated_addresses": ["203.0.113.10", "203.0.113.11"],  # Public IP pool
    "static_address": None,
    "bi_directional": None,
    "result": ""
})
print(result1["result"])

# Step 9: Test Static IP (1:1 NAT)
print("\n" + "="*70)
print("TEST 2: Static IP Translation (1:1 NAT with Bi-directional)")
print("="*70)
result2 = nat_app.invoke({
    "name": "DMZ-Web-Server-NAT",
    "nat_type": "ipv4",
    "source": ["192.168.1.10"],  # Internal web server
    "destination": ["any"],
    "translation_type": "static_ip",
    "translated_addresses": None,
    "static_address": "203.0.113.50",  # Public IP
    "bi_directional": True,  # Allow inbound connections
    "result": ""
})
print(result2["result"])

# Step 10: Test Destination NAT (Port Forwarding)
print("\n" + "="*70)
print("TEST 3: Destination NAT (Port Forwarding)")
print("="*70)
result3 = nat_app.invoke({
    "name": "Port-Forward-SSH",
    "nat_type": "ipv4",
    "source": ["any"],
    "destination": ["203.0.113.100"],  # Public IP
    "translation_type": "destination",
    "translated_addresses": None,
    "static_address": None,
    "bi_directional": None,
    "result": ""
})
print(result3["result"])

# Step 11: Test validation errors
print("\n" + "="*70)
print("TEST 4: Validation Error - Dynamic NAT Missing translated_addresses")
print("="*70)
result4 = nat_app.invoke({
    "name": "Invalid-PAT",
    "nat_type": "ipv4",
    "source": ["192.168.0.0/16"],
    "destination": ["any"],
    "translation_type": "dynamic_ip_and_port",
    "translated_addresses": None,  # ERROR: Missing public IP pool!
    "static_address": None,
    "bi_directional": None,
    "result": ""
})
print(result4["result"])

print("\n" + "="*70)
print("‚úÖ Advanced Exercise Complete!")
print("="*70)
print("\nüí° Key Takeaways:")
print("   1. ‚úÖ Complex state with 9 fields and 4 data types")
print("   2. ‚úÖ Multi-level conditional logic (if/elif/else for 3 translation types)")
print("   3. ‚úÖ Optional fields that vary by translation_type:")
print("      - dynamic_ip_and_port needs translated_addresses")
print("      - static_ip needs static_address (and optional bi_directional)")
print("      - destination needs neither")
print("   4. ‚úÖ Validation ensures required fields match translation type")
print("   5. ‚úÖ Real-world SCM NAT rule patterns from docs/examples/nat_policy.py")
print("\nüìö This demonstrates production-ready NAT validation!")
print("   You've now mastered complex state management with conditional Optional fields!")

<a id='real-world-example'></a>

---

## 8.5 Real-World Example: Complete SCM Configuration Workflow

Before we dive into multi-node graphs in Notebook 105, let's see a **real-world SCM configuration example** that shows why sequential workflows matter.

### The Challenge

In SCM, you often need to configure multiple related objects:
1. **Tags** - For organizing resources
2. **Address Objects** - IP addresses/networks
3. **Address Groups** - Collections of addresses

These have **dependencies**: groups reference addresses, both use tags.

Let's build a simple example showing this pattern!

#### Define SCM Configuration State

Our state tracks a tag, address object, and address group creation workflow:

In [None]:
from typing import TypedDict, List, Optional

class SCMWorkflowState(TypedDict):
    """State for SCM configuration workflow (tag ‚Üí address ‚Üí group)."""
    # Configuration
    folder: str
    tag_name: str
    address_name: str
    ip_netmask: str
    group_name: str

    # Results
    workflow_log: str

print("‚úÖ SCMWorkflowState defined!")
print("\nThis state tracks: folder, tag_name, address_name, ip_netmask, group_name, workflow_log")

#### Production Pattern: ScmClient Initialization

Before we build the workflow, let's see how to initialize the SCM client in production. While we won't execute actual API calls in this notebook, understanding the initialization pattern is important:

```python
# Production Pattern: Initialize ScmClient once at module level
import os
from scm.client import ScmClient

# Initialize client with credentials from environment variables
client = ScmClient(
    client_id=os.getenv("SCM_CLIENT_ID"),
    client_secret=os.getenv("SCM_CLIENT_SECRET"),
    tsg_id=os.getenv("SCM_TSG_ID")
)

# Now you can use the client in your nodes:
# tag_result = client.tag.create({...})
# address_result = client.address.create({...})
# group_result = client.address_group.create({...})
```

**Key Production Patterns:**

1. **Initialize Once**: Create the ScmClient instance once (module level or app startup)
2. **Reuse Everywhere**: Pass the client instance to nodes or use as module-level variable
3. **Environment Variables**: Store credentials in environment variables (never hard-code!)
4. **Error Handling**: Wrap all client calls in try-catch blocks (we'll see this next)

**Why This Matters:**
- ‚úÖ Performance: Avoid recreating client on every operation
- ‚úÖ Security: Credentials from environment, not code
- ‚úÖ Best Practice: Matches pan-scm-sdk documentation patterns

In Notebooks 105-107, you'll use this client initialization pattern with actual API operations!

#### Create Configuration Node

This node simulates creating all three objects in SCM:

In [None]:
def configure_scm_objects(state: SCMWorkflowState) -> dict:
    """Simulate creating tag ‚Üí address ‚Üí group in SCM.

    In production, this would be:
    1. client.tag.create({"name": "Production", "color": "Red", ...})
    2. client.address.create({"name": "web_server", "ip_netmask": "...", "tag": ["Production"]})
    3. client.address_group.create({"name": "web_servers", "static": ["web_server"], ...})
    """
    folder = state["folder"]
    tag_name = state["tag_name"]
    address_name = state["address_name"]
    ip_netmask = state["ip_netmask"]
    group_name = state["group_name"]

    # Simulate the workflow
    log = f"""SCM Configuration Workflow in folder '{folder}':

Step 1: Created tag '{tag_name}' (color: Red)
   ‚Üí Simulates: client.tag.create({{"name": "{tag_name}", "color": "Red", "folder": "{folder}"}})

Step 2: Created address '{address_name}' ({ip_netmask}) with tag [{tag_name}]
   ‚Üí Simulates: client.address.create({{
         "name": "{address_name}",
         "ip_netmask": "{ip_netmask}",
         "folder": "{folder}",
         "tag": ["{tag_name}"]
     }})

Step 3: Created address group '{group_name}' containing [{address_name}]
   ‚Üí Simulates: client.address_group.create({{
         "name": "{group_name}",
         "static": ["{address_name}"],
         "folder": "{folder}",
         "tag": ["{tag_name}"]
     }})

‚úÖ Complete SCM configuration created successfully!
"""

    return {"workflow_log": log}

print("‚úÖ configure_scm_objects node defined!")
print("\nüí° This demonstrates the SCM dependency pattern: tags ‚Üí addresses ‚Üí groups")

#### Build and Execute

Let's build a simple graph and run the workflow:

In [None]:
from langgraph.graph import StateGraph, START, END

# Build graph
scm_workflow_graph = StateGraph(SCMWorkflowState)
scm_workflow_graph.add_node("configure", configure_scm_objects)
scm_workflow_graph.set_entry_point("configure")
scm_workflow_graph.set_finish_point("configure")
scm_workflow_app = scm_workflow_graph.compile()

# Execute
result = scm_workflow_app.invoke({
    "folder": "Texas",
    "tag_name": "Production",
    "address_name": "web_server",
    "ip_netmask": "192.168.1.10/32",
    "group_name": "web_servers",
    "workflow_log": ""
})

print(result["workflow_log"])

#### Why This Matters

This simple example shows:
- **Dependency Order**: Tags created before addresses, addresses before groups
- **Real SCM Patterns**: Actual `pan-scm-sdk` API structure from `docs/examples/`
- **Sequential Logic**: Foundation for multi-node workflows (coming in Section 9!)

In **Section 9**, we'll learn how to split this into **separate nodes** with proper error handling and state management.

For now, you've seen a realistic SCM workflow! Let's move on to learn the sequential multi-node pattern that makes this more robust.

<a id='whats-next'></a>

---

## What's Next

Congratulations! You've mastered working with complex state structures in LangGraph. You now know how to:

- Design multi-field state schemas with TypedDict
- Work with diverse data types (strings, lists, integers, booleans, dictionaries)
- Implement safe state access patterns
- Handle optional fields with proper defaults
- Build complex SCM object structures for network automation

### In Notebook 105: Sequential Workflows

In the next notebook, we'll build on these state management skills by creating **sequential workflows** where multiple nodes work together in a specific order. You'll learn:

- **Multi-node graphs** - Building workflows with multiple processing steps
- **Sequential execution** - Controlling the order of operations
- **State transformations** - How data flows and transforms through the workflow
- **Real-world SCM workflows** - Complete configuration deployment pipelines

This is where your LangGraph knowledge will really come together to build powerful automation workflows!

**Ready to continue?** Head to **Notebook 105: Sequential Workflows** to build your first multi-step automation pipeline!


---

## Summary

In this notebook, you learned how to work with complex state structures in LangGraph. The key takeaways:

1. **Multi-field state schemas** enable tracking multiple related pieces of information
2. **Type diversity** - State can include strings, lists, integers, booleans, and dictionaries
3. **Safe access patterns** - Always check if optional fields exist before accessing them
4. **State initialization** - Fields only exist after they've been set by a node
5. **Real-world applications** - Complex state structures enable sophisticated network automation workflows

These concepts form the foundation for building production-ready LangGraph applications!
