In [52]:
# Cell 1: 임포트 & 설정
import os
import json
import re
import asyncio
import tempfile
import traceback
from copy import deepcopy
from datetime import datetime
from typing import Dict, List, Any, Optional, Union
from pathlib import Path

# 프로젝트 모듈들
from state import State, create_empty_user_memo, touch_processing_timestamp
from db import db, engine

# LLM 관련
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()

# OpenAI 클라이언트 초기화
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

print("✅ 모든 모듈 임포트 완료")
print(f"✅ OpenAI API 키 설정: {'있음' if os.getenv('OPENAI_API_KEY') else '없음'}")
print(f"✅ DB 연결 상태: {db.get_table_info() is not None}")

✅ 모든 모듈 임포트 완료
✅ OpenAI API 키 설정: 있음
✅ DB 연결 상태: True


In [13]:
# Cell 2-1: parsing_node 개선된 버전
def parsing_node(state: dict) -> dict:
    """개선된 parsing_node - DB 업데이트 요청도 구분"""
    
    state['processing_timestamp'] = datetime.now().isoformat(timespec="seconds")
    
    user_input = state.get('user_input', '')
    if not user_input or not user_input.strip():
        state['status'] = "error"
        state['reason'] = "Empty user input"
        return state
    
    try:
        # 개선된 system_prompt
        system_prompt = """You are a wedding planning assistant. Analyze the user input and extract:

1. VENDOR_TYPE: wedding_hall, studio, wedding_dress, makeup, or null
2. REGION: Korean location keywords (강남, 청담, 압구정, etc.) or null  
3. INTENT_HINT: 
   - "recommend": 업체 추천 요청
   - "tool": DB 업데이트, 계산, 검색이 필요한 요청
   - "general": 일반 상담, 정보 제공
4. UPDATE_TYPE: 사용자 정보 업데이트인 경우 (wedding_date, budget, guest_count, preferred_location, null)
5. BUDGET_MENTIONED: any budget numbers in Korean won (만원 units)

Respond ONLY in this JSON format:
{
    "vendor_type": "wedding_hall|studio|wedding_dress|makeup|null",
    "region_keyword": "지역명|null", 
    "intent_hint": "recommend|tool|general",
    "update_type": "wedding_date|budget|guest_count|preferred_location|null",
    "budget_manwon": number_or_null,
    "confidence": 0.0-1.0
}

Examples:
- "강남 웨딩홀 추천해줘" → {"vendor_type": "wedding_hall", "region_keyword": "강남", "intent_hint": "recommend", "update_type": null, "budget_manwon": null, "confidence": 0.9}
- "결혼식 날짜 크리스마스로 바꿔줘" → {"vendor_type": null, "region_keyword": null, "intent_hint": "tool", "update_type": "wedding_date", "budget_manwon": null, "confidence": 0.95}
- "총 예산 5천만원으로 수정해주세요" → {"vendor_type": null, "region_keyword": null, "intent_hint": "tool", "update_type": "budget", "budget_manwon": 5000, "confidence": 0.95}"""

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_input}
            ],
            temperature=0.1,
            max_tokens=300
        )
        
        parsed_response = json.loads(response.choices[0].message.content)
        
        # State 업데이트
        state['vendor_type'] = parsed_response.get("vendor_type")
        if state['vendor_type'] == "null":
            state['vendor_type'] = None
            
        state['region_keyword'] = parsed_response.get("region_keyword") 
        if state['region_keyword'] == "null":
            state['region_keyword'] = None
            
        state['intent_hint'] = parsed_response.get("intent_hint", "general")
        state['update_type'] = parsed_response.get("update_type")  # 새로 추가
        
        budget = parsed_response.get("budget_manwon")
        if budget and isinstance(budget, (int, float)):
            state['total_budget_manwon'] = int(budget)
        
        state['status'] = "ok"
        state['error_info'] = {
            "parsing_confidence": parsed_response.get("confidence", 0.0),
            "raw_llm_response": parsed_response
        }
        
    except Exception as e:
        state['status'] = "error" 
        state['reason'] = f"Parsing node error: {str(e)}"
        state['intent_hint'] = "general"
    
    return state

In [16]:
# Cell 3-1: 추가 테스트 케이스 (DB 업데이트 관련)
def test_parsing_node_advanced():
    """DB 업데이트 및 복잡한 요청 테스트"""
    
    test_cases = [
        {
            "name": "결혼식 날짜 변경 요청",
            "input": "결혼식 날짜 올해 크리스마스 바로 다음 날짜로 바꿔줘",
            "expected_intent": "tool"  # user_db_update_tool 필요
        },
        {
            "name": "예산 정보 업데이트",
            "input": "총 예산을 5천만원으로 수정해주세요",
            "expected_intent": "tool",
            "expected_budget": 5000
        },
        {
            "name": "하객 수 변경",
            "input": "하객 수를 150명으로 바꿔주세요",
            "expected_intent": "tool"
        },
        {
            "name": "지역 선호도 업데이트",
            "input": "선호 지역을 강남에서 홍대로 바꿔줘",
            "expected_intent": "tool"
        }
    ]
    
    print("🧪 고급 parsing_node 테스트 시작\n")
    
    for i, test in enumerate(test_cases, 1):
        print(f"테스트 {i}: {test['name']}")
        print(f"입력: '{test['input']}'")
        
        test_state = {
            'user_id': f"test_user_adv_{i:03d}",
            'user_input': test['input'],
            'messages': []
        }
        
        result_state = parsing_node(test_state)
        
        print(f"상태: {result_state.get('status', 'unknown')}")
        if result_state.get('status') == "error":
            print(f"❌ 에러: {result_state.get('reason')}")
        else:
            print(f"✅ vendor_type: {result_state.get('vendor_type')}")
            print(f"✅ region_keyword: {result_state.get('region_keyword')}")
            print(f"✅ intent_hint: {result_state.get('intent_hint')}")
            print(f"✅ budget_manwon: {result_state.get('total_budget_manwon')}")
            
            # 기대값과 비교
            expected_intent = test.get('expected_intent')
            actual_intent = result_state.get('intent_hint')
            if expected_intent and actual_intent != expected_intent:
                print(f"⚠️  예상 intent: {expected_intent}, 실제: {actual_intent}")
            
        print("-" * 50)
    
    print("🧪 고급 parsing_node 테스트 완료")

# 테스트 실행
test_parsing_node_advanced()

🧪 고급 parsing_node 테스트 시작

테스트 1: 결혼식 날짜 변경 요청
입력: '결혼식 날짜 올해 크리스마스 바로 다음 날짜로 바꿔줘'
상태: ok
✅ vendor_type: None
✅ region_keyword: None
✅ intent_hint: tool
✅ budget_manwon: None
--------------------------------------------------
테스트 2: 예산 정보 업데이트
입력: '총 예산을 5천만원으로 수정해주세요'
상태: ok
✅ vendor_type: None
✅ region_keyword: None
✅ intent_hint: tool
✅ budget_manwon: 5000
--------------------------------------------------
테스트 3: 하객 수 변경
입력: '하객 수를 150명으로 바꿔주세요'
상태: ok
✅ vendor_type: None
✅ region_keyword: None
✅ intent_hint: tool
✅ budget_manwon: None
--------------------------------------------------
테스트 4: 지역 선호도 업데이트
입력: '선호 지역을 강남에서 홍대로 바꿔줘'
상태: ok
✅ vendor_type: None
✅ region_keyword: None
✅ intent_hint: tool
✅ budget_manwon: None
--------------------------------------------------
🧪 고급 parsing_node 테스트 완료


In [20]:
# Cell 4: memo_check_node 함수 정의
def memo_check_node(state: dict) -> dict:
    """
    Load and validate user's long-term memory for wedding planning context.
    
    This node manages persistent user information stored in JSON files:
    - Checks if user memory file exists for the given user_id
    - Loads existing memory or creates new empty memory structure
    - Validates memory integrity and schema version
    - Updates state with current user profile, preferences, and conversation history
    - Sets memo_file_path for future updates
    
    The memory structure includes:
    - User profile (wedding_date, budget, guest_count, preferred_locations)
    - Conversation summary from previous sessions
    - Schema version for compatibility
    
    Args:
        state (dict): Current conversation state with user_id
        
    Returns:
        dict: Updated state with user_memo, memo_file_path, and memo_needs_update flag
    """
    
    user_id = state.get('user_id')
    if not user_id:
        state['status'] = "error"
        state['reason'] = "No user_id provided for memory check"
        return state
    
    try:
        # Get memory file path using state.py utility
        from state import get_memo_file_path, create_empty_user_memo
        
        memo_file_path = get_memo_file_path(user_id)
        state['memo_file_path'] = memo_file_path
        
        # Check if memory file exists
        if os.path.exists(memo_file_path):
            # Load existing memory
            with open(memo_file_path, 'r', encoding='utf-8') as f:
                user_memo = json.load(f)
            
            # Validate memory structure
            if not isinstance(user_memo, dict):
                raise ValueError("Invalid memory format: not a dictionary")
            
            if 'profile' not in user_memo:
                raise ValueError("Invalid memory format: missing profile")
            
            if 'version' not in user_memo:
                user_memo['version'] = "1.0"  # Add version if missing
            
            # Validate user_id consistency
            memo_user_id = user_memo.get('profile', {}).get('user_id')
            if memo_user_id != user_id:
                raise ValueError(f"User ID mismatch: expected {user_id}, found {memo_user_id}")
            
            state['user_memo'] = user_memo
            state['memo_needs_update'] = False
            
            print(f"✅ Loaded existing memory for user {user_id}")
            
        else:
            # Create new empty memory
            user_memo = create_empty_user_memo(user_id)
            state['user_memo'] = user_memo
            state['memo_needs_update'] = True  # New memory needs to be saved
            
            print(f"🆕 Created new memory for user {user_id}")
        
        # Extract profile information to state for easy access
        profile = user_memo.get('profile', {})
        
        # Update state with profile info if available
        if profile.get('total_budget_manwon'):
            state['total_budget_manwon'] = profile['total_budget_manwon']
        
        if profile.get('wedding_date'):
            state['wedding_date'] = profile['wedding_date']
        
        if profile.get('guest_count'):
            state['guest_count'] = profile['guest_count']
        
        if profile.get('preferred_locations'):
            state['preferred_locations'] = profile['preferred_locations']
        
        # Add conversation context
        conversation_summary = user_memo.get('conversation_summary')
        if conversation_summary:
            state['conversation_summary'] = conversation_summary
        
        state['status'] = "ok"
        
        # Add debug info
        state['memo_debug'] = {
            "memo_file_exists": os.path.exists(memo_file_path),
            "memo_version": user_memo.get('version', 'unknown'),
            "profile_completeness": len([k for k, v in profile.items() if v is not None])
        }
        
    except FileNotFoundError:
        state['status'] = "error"
        state['reason'] = f"Memory file not found: {memo_file_path}"
        
    except json.JSONDecodeError as e:
        state['status'] = "error"
        state['reason'] = f"Invalid JSON in memory file: {str(e)}"
        
    except ValueError as e:
        state['status'] = "error"
        state['reason'] = f"Memory validation failed: {str(e)}"
        
    except Exception as e:
        state['status'] = "error"
        state['reason'] = f"Memory check error: {str(e)}"
    
    return state

In [25]:
# Cell 5: memo_check_node 테스트 (간단 버전)
def test_memo_check_node_simple():
    """memo_check_node 간단 테스트"""
    
    test_cases = [
        {"name": "신규 사용자", "user_id": "new_user_001"},
        {"name": "기존 사용자", "user_id": "existing_user_001"},
        {"name": "다른 신규 사용자", "user_id": "new_user_002"}
    ]
    
    print("🧪 memo_check_node 테스트 시작\n")
    
    for i, test in enumerate(test_cases, 1):
        print(f"테스트 {i}: {test['name']}")
        print(f"사용자 ID: {test['user_id']}")
        
        # 기존 사용자용 샘플 메모리 생성
        if "existing" in test['user_id']:
            from state import get_memo_file_path, create_empty_user_memo
            memo_path = get_memo_file_path(test['user_id'])
            os.makedirs("memories", exist_ok=True)
            
            sample_memo = create_empty_user_memo(test['user_id'])
            sample_memo['profile']['total_budget_manwon'] = 3000
            sample_memo['profile']['wedding_date'] = "2025-10-15"
            
            with open(memo_path, 'w', encoding='utf-8') as f:
                json.dump(sample_memo, f, ensure_ascii=False, indent=2)
        
        # 테스트 실행
        test_state = {'user_id': test['user_id']}
        result_state = memo_check_node(test_state)
        
        # 결과 출력
        print(f"상태: {result_state.get('status')}")
        if result_state.get('status') == "ok":
            print(f"✅ 메모리 로드/생성 성공")
            if result_state.get('user_memo', {}).get('profile', {}).get('total_budget_manwon'):
                print(f"✅ 기존 예산: {result_state['user_memo']['profile']['total_budget_manwon']}만원")
        else:
            print(f"❌ 에러: {result_state.get('reason')}")
        
        print("-" * 40)
    
    print("🧪 memo_check_node 테스트 완료")

# 테스트 실행
test_memo_check_node_simple()

🧪 memo_check_node 테스트 시작

테스트 1: 신규 사용자
사용자 ID: new_user_001
🆕 Created new memory for user new_user_001
상태: ok
✅ 메모리 로드/생성 성공
----------------------------------------
테스트 2: 기존 사용자
사용자 ID: existing_user_001
✅ Loaded existing memory for user existing_user_001
상태: ok
✅ 메모리 로드/생성 성공
✅ 기존 예산: 3000만원
----------------------------------------
테스트 3: 다른 신규 사용자
사용자 ID: new_user_002
🆕 Created new memory for user new_user_002
상태: ok
✅ 메모리 로드/생성 성공
----------------------------------------
🧪 memo_check_node 테스트 완료


In [26]:
# 간단 확인
print("🔍 현재 memories 폴더 내용:")
import os
if os.path.exists("memories"):
    files = os.listdir("memories")
    for file in files:
        print(f"  - {file}")
else:
    print("  memories 폴더 없음")

print("\n🔍 new_user_002 상태 확인:")
test_state = {'user_id': 'new_user_002'}
result = memo_check_node(test_state)
print(f"memo_needs_update: {result.get('memo_needs_update')}")
print(f"user_memo 존재: {'user_memo' in result}")

🔍 현재 memories 폴더 내용:
  - user_existing_user_001_memo.json

🔍 new_user_002 상태 확인:
🆕 Created new memory for user new_user_002
memo_needs_update: True
user_memo 존재: True


In [31]:
# Cell 6: conditional_router 함수 정의
def conditional_router(state: dict) -> dict:
    """
    Central decision hub that determines the next processing route based on parsed intent and memory context.
    
    This router analyzes the combination of:
    - User intent from parsing_node (recommend, tool, general)
    - Current memory state and missing information
    - Vendor type and region preferences
    - Update requirements (budget, date, preferences)
    
    Routing Logic:
    - "recommend" intent → recommendation_node (for vendor suggestions)
    - "tool" intent → tool_execution_node (for DB queries, calculations, updates)
    - "general" intent → general_response_node (for FAQ, guidance, chat)
    
    The router also sets up tool execution plans and priorities based on the specific request type.
    
    Args:
        state (dict): State with parsed_intent, user_memo, and other context
        
    Returns:
        dict: Updated state with routing_decision and tools_to_execute list
    """
    
    # 라우팅에 필요한 정보 수집
    intent_hint = state.get('intent_hint', 'general')
    vendor_type = state.get('vendor_type')
    region_keyword = state.get('region_keyword')
    update_type = state.get('update_type')  # parsing_node_v2에서 온 정보
    user_memo = state.get('user_memo', {})
    user_input = state.get('user_input', '').lower()
    
    try:
        # 1. 업데이트 요청이 있는 경우 → tool_execution (최우선순위)
        if update_type:
            state['routing_decision'] = "tool_execution"
            state['tools_to_execute'] = ["user_db_update_tool"]
            state['reason'] = f"User data update required: {update_type}"
            
        # 2. 추천 요청인 경우
        elif intent_hint == "recommend":
            profile = user_memo.get('profile', {})
            
            if vendor_type:
                # 특정 업체 타입 추천 → tool_execution으로 (메모리 상태 무관)
                state['routing_decision'] = "tool_execution"  
                state['tools_to_execute'] = ["db_query_tool"]
                
                # 웹 검색도 병행하여 최신 정보 수집
                if region_keyword:
                    state['tools_to_execute'].append("web_search_tool")
                
                state['reason'] = f"Specific vendor recommendation: {vendor_type}"
                
            else:
                # 일반적인 추천 요청 → recommendation_node에서 전략 수립
                state['routing_decision'] = "recommendation"
                state['reason'] = "General recommendation request"
        
        # 3. 툴 실행이 필요한 경우
        elif intent_hint == "tool":
            state['routing_decision'] = "tool_execution"
            
            # 요청 내용에 따라 필요한 툴들 결정
            tools_needed = []
            
            # 예산 계산이 필요한 경우
            if any(keyword in user_input for keyword in ['계산', '예산', '총액', '비용']):
                tools_needed.append("calculator_tool")
                # 예산 계산은 보통 DB 정보도 필요함
                tools_needed.append("db_query_tool")
            
            # DB 조회가 필요한 경우  
            if vendor_type or region_keyword:
                if "db_query_tool" not in tools_needed:
                    tools_needed.append("db_query_tool")
            
            # 최신 정보 검색이 필요한 경우
            if any(keyword in user_input for keyword in ['최신', '요즘', '트렌드', '후기']):
                tools_needed.append("web_search_tool")
            
            # 기본적으로 DB 조회는 포함 (다른 툴이 없는 경우만)
            if not tools_needed:
                tools_needed.append("db_query_tool")
            
            state['tools_to_execute'] = tools_needed
            state['reason'] = f"Tool execution needed: {', '.join(tools_needed)}"
        
        # 4. 일반 대화/상담인 경우
        else:  # intent_hint == "general"
            state['routing_decision'] = "general_response"
            state['tools_to_execute'] = []
            state['reason'] = "General conversation or FAQ"
        
        # 5. 메모리 상태 기반 조정 로직 (신중하게 적용)
        profile = user_memo.get('profile', {})
        missing_info = []
        
        if not profile.get('wedding_date'):
            missing_info.append("wedding_date")
        if not profile.get('total_budget_manwon'):
            missing_info.append("budget")
        if not profile.get('guest_count'):
            missing_info.append("guest_count")
        
        # 정보 부족한 상황에서 일반적 추천 요청인 경우만 상담으로 전환
        # (구체적 업체 요청은 제외)
        if (len(missing_info) >= 3 and 
            state.get('routing_decision') == "recommendation" and 
            not vendor_type):
            state['routing_decision'] = "general_response"
            state['reason'] = f"Insufficient profile data for recommendation: missing {', '.join(missing_info)}"
        
        state['status'] = "ok"
        
        # 라우팅 결정에 대한 메타데이터 저장
        state['routing_metadata'] = {
            "original_intent": intent_hint,
            "final_decision": state['routing_decision'],
            "tools_count": len(state.get('tools_to_execute', [])),
            "missing_profile_info": missing_info,
            "decision_factors": {
                "has_vendor_type": bool(vendor_type),
                "has_region": bool(region_keyword), 
                "has_update_request": bool(update_type),
                "profile_completeness": len([v for v in profile.values() if v is not None])
            }
        }
        
    except Exception as e:
        state['status'] = "error"
        state['reason'] = f"Routing decision error: {str(e)}"
        state['routing_decision'] = "general_response"  # 안전한 폴백
        state['tools_to_execute'] = []
    
    return state

In [33]:
# Cell 7: conditional_router 테스트
def test_conditional_router():
    """conditional_router 함수를 다양한 시나리오로 테스트"""
    
    test_cases = [
        {
            "name": "웨딩홀 추천 요청 (구체적)",
            "setup": {
                "intent_hint": "recommend",
                "vendor_type": "wedding_hall", 
                "region_keyword": "강남",
                "user_input": "강남 웨딩홀 추천해주세요"
            },
            "expected_routing": "tool_execution",
            "expected_tools": ["db_query_tool", "web_search_tool"]
        },
        {
            "name": "일반 추천 요청",
            "setup": {
                "intent_hint": "recommend",
                "vendor_type": None,
                "user_input": "결혼 준비 어떻게 시작하면 좋을까요?"
            },
            "expected_routing": "recommendation"
        },
        {
            "name": "예산 계산 요청",
            "setup": {
                "intent_hint": "tool",
                "user_input": "총 예산 계산해주세요"
            },
            "expected_routing": "tool_execution",
            "expected_tools": ["calculator_tool", "db_query_tool"]
        },
        {
            "name": "사용자 정보 업데이트",
            "setup": {
                "intent_hint": "tool",
                "update_type": "wedding_date",
                "user_input": "결혼식 날짜를 크리스마스로 바꿔주세요"
            },
            "expected_routing": "tool_execution",
            "expected_tools": ["user_db_update_tool"]
        },
        {
            "name": "일반 질문",
            "setup": {
                "intent_hint": "general",
                "user_input": "결혼 준비 기간은 보통 얼마나 걸리나요?"
            },
            "expected_routing": "general_response"
        }
    ]
    
    print("🧪 conditional_router 테스트 시작\n")
    
    for i, test in enumerate(test_cases, 1):
        print(f"테스트 {i}: {test['name']}")
        print(f"입력: '{test['setup']['user_input']}'")
        
        # 테스트 상태 생성 (기본 메모리 포함)
        test_state = {
            'user_id': f"router_test_{i:03d}",
            'user_memo': {
                'profile': {
                    'user_id': f"router_test_{i:03d}",
                    'wedding_date': "2025-10-15" if i % 2 == 0 else None,  # 절반은 정보 있음
                    'total_budget_manwon': 3000 if i % 2 == 0 else None,
                    'guest_count': None,
                    'preferred_locations': []
                },
                'version': "1.0"
            }
        }
        
        # setup 정보를 state에 추가
        test_state.update(test['setup'])
        
        # conditional_router 실행
        result_state = conditional_router(test_state)
        
        # 결과 출력
        print(f"상태: {result_state.get('status')}")
        if result_state.get('status') == "error":
            print(f"❌ 에러: {result_state.get('reason')}")
        else:
            routing = result_state.get('routing_decision')
            tools = result_state.get('tools_to_execute', [])
            
            print(f"✅ 라우팅 결정: {routing}")
            print(f"✅ 실행할 툴: {tools}")
            print(f"✅ 결정 이유: {result_state.get('reason')}")
            
            # 기대값과 비교
            expected_routing = test.get('expected_routing')
            expected_tools = test.get('expected_tools', [])
            
            if expected_routing and routing != expected_routing:
                print(f"⚠️  예상 라우팅: {expected_routing}, 실제: {routing}")
            
            if expected_tools and set(tools) != set(expected_tools):
                print(f"⚠️  예상 툴: {expected_tools}, 실제: {tools}")
            
            # 메타데이터 출력
            if result_state.get('routing_metadata'):
                meta = result_state['routing_metadata']
                print(f"📊 메타데이터: 프로필 완성도 {meta['decision_factors']['profile_completeness']}/4")
        
        print("-" * 50)
    
    print("🧪 conditional_router 테스트 완료")

# 테스트 실행
test_conditional_router()

🧪 conditional_router 테스트 시작

테스트 1: 웨딩홀 추천 요청 (구체적)
입력: '강남 웨딩홀 추천해주세요'
상태: ok
✅ 라우팅 결정: tool_execution
✅ 실행할 툴: ['db_query_tool', 'web_search_tool']
✅ 결정 이유: Specific vendor recommendation: wedding_hall
📊 메타데이터: 프로필 완성도 2/4
--------------------------------------------------
테스트 2: 일반 추천 요청
입력: '결혼 준비 어떻게 시작하면 좋을까요?'
상태: ok
✅ 라우팅 결정: recommendation
✅ 실행할 툴: []
✅ 결정 이유: General recommendation request
📊 메타데이터: 프로필 완성도 4/4
--------------------------------------------------
테스트 3: 예산 계산 요청
입력: '총 예산 계산해주세요'
상태: ok
✅ 라우팅 결정: tool_execution
✅ 실행할 툴: ['calculator_tool', 'db_query_tool']
✅ 결정 이유: Tool execution needed: calculator_tool, db_query_tool
📊 메타데이터: 프로필 완성도 2/4
--------------------------------------------------
테스트 4: 사용자 정보 업데이트
입력: '결혼식 날짜를 크리스마스로 바꿔주세요'
상태: ok
✅ 라우팅 결정: tool_execution
✅ 실행할 툴: ['user_db_update_tool']
✅ 결정 이유: User data update required: wedding_date
📊 메타데이터: 프로필 완성도 4/4
--------------------------------------------------
테스트 5: 일반 질문
입력: '결혼 준비 기간은 보통 얼마나 걸리나요?'
상태: 

In [35]:
# Cell 8: tool_execution_node 구현

def tool_execution_node(state: dict) -> dict:
    """
    Tool execution hub that manages and executes multiple tools based on routing decisions.
    
    This node handles:
    - Sequential and parallel tool execution
    - Tool execution order optimization
    - Result validation and post-processing
    - Error handling and fallback strategies
    - Tool result aggregation and formatting
    
    Execution Patterns:
    - Sequential: DB query → Web search (for data enrichment)
    - Parallel: Calculator + DB query (independent operations)
    - Conditional: Execute additional tools based on initial results
    
    Args:
        state (dict): State containing tools_to_execute list and context
        
    Returns:
        dict: Updated state with tool_results and execution metadata
    """
    
    tools_to_execute = state.get('tools_to_execute', [])
    user_memo = state.get('user_memo', {})
    
    # 툴 실행 결과를 저장할 딕셔너리
    tool_results = {}
    execution_log = []
    
    try:
        if not tools_to_execute:
            state['tool_results'] = {}
            state['reason'] = "No tools to execute"
            state['status'] = "ok"
            return state
        
        print(f"🔧 실행할 툴: {tools_to_execute}")
        
        # 각 툴을 순차적으로 실행
        for tool_name in tools_to_execute:
            execution_start = datetime.now()
            
            try:
                print(f"⚡ {tool_name} 실행 중...")
                
                if tool_name == "db_query_tool":
                    result = execute_db_query_tool(state)
                elif tool_name == "web_search_tool":
                    result = execute_web_search_tool(state)
                elif tool_name == "calculator_tool":
                    result = execute_calculator_tool(state)
                elif tool_name == "user_db_update_tool":
                    result = execute_user_db_update_tool(state)
                else:
                    result = {
                        "success": False,
                        "error": f"Unknown tool: {tool_name}",
                        "data": None
                    }
                
                execution_end = datetime.now()
                execution_time = (execution_end - execution_start).total_seconds()
                
                # 결과 저장
                tool_results[tool_name] = result
                
                # 실행 로그 기록
                log_entry = {
                    "tool_name": tool_name,
                    "execution_time": execution_time,
                    "success": result.get("success", False),
                    "timestamp": execution_end.isoformat()
                }
                execution_log.append(log_entry)
                
                print(f"✅ {tool_name} 완료 ({execution_time:.2f}초)")
                
                if not result.get("success", False):
                    print(f"⚠️ {tool_name} 실행 실패: {result.get('error', 'Unknown error')}")
                
            except Exception as e:
                error_result = {
                    "success": False,
                    "error": f"Tool execution failed: {str(e)}",
                    "data": None
                }
                tool_results[tool_name] = error_result
                
                execution_end = datetime.now()
                execution_time = (execution_end - execution_start).total_seconds()
                
                log_entry = {
                    "tool_name": tool_name,
                    "execution_time": execution_time,
                    "success": False,
                    "error": str(e),
                    "timestamp": execution_end.isoformat()
                }
                execution_log.append(log_entry)
                
                print(f"❌ {tool_name} 에러: {str(e)}")
        
        # 전체 실행 결과 요약
        successful_tools = [name for name, result in tool_results.items() if result.get("success")]
        failed_tools = [name for name, result in tool_results.items() if not result.get("success")]
        
        state['tool_results'] = tool_results
        state['execution_log'] = execution_log
        state['successful_tools'] = successful_tools
        state['failed_tools'] = failed_tools
        state['status'] = "ok"
        state['reason'] = f"Executed {len(successful_tools)}/{len(tools_to_execute)} tools successfully"
        
        print(f"📊 실행 완료: 성공 {len(successful_tools)}, 실패 {len(failed_tools)}")
        
    except Exception as e:
        state['tool_results'] = {}
        state['status'] = "error"
        state['reason'] = f"Tool execution node failed: {str(e)}"
        print(f"💥 tool_execution_node 전체 실패: {str(e)}")
    
    return state


def execute_db_query_tool(state: dict) -> dict:
    """
    Database query tool for wedding vendor information.
    
    Searches the database for venues, studios, dresses, makeup services
    based on user criteria like location, budget, style preferences.
    """
    try:
        # 쿼리 파라미터 추출
        vendor_type = state.get('vendor_type')
        region_keyword = state.get('region_keyword')
        user_memo = state.get('user_memo', {})
        profile = user_memo.get('profile', {})
        budget = profile.get('total_budget_manwon')
        
        # 모의 데이터베이스 쿼리 결과
        if vendor_type == "wedding_hall":
            mock_results = [
                {
                    "name": "호텔 루미에르",
                    "type": "웨딩홀",
                    "location": "강남구",
                    "price_manwon": 2950,
                    "capacity": 200,
                    "rating": 4.6,
                    "features": ["원본 제공", "주차 300대", "야간가든"]
                },
                {
                    "name": "더채플앳청담",
                    "type": "웨딩홀", 
                    "location": "강남구",
                    "price_manwon": 4400,
                    "capacity": 300,
                    "rating": 4.8,
                    "features": ["채플홀", "스냅사진", "스냅비디오"]
                }
            ]
        elif vendor_type == "studio":
            mock_results = [
                {
                    "name": "스튜디오 노바",
                    "type": "스튜디오",
                    "location": "성수동",
                    "price_manwon": 350,
                    "rating": 4.8,
                    "features": ["원본 제공", "야외 촬영", "드레스 대여"]
                }
            ]
        else:
            # 일반적인 검색
            mock_results = [
                {
                    "name": "종합 웨딩 패키지",
                    "type": "종합",
                    "estimated_total": 5000,
                    "components": ["웨딩홀", "스튜디오", "드레스", "메이크업"]
                }
            ]
        
        # 지역 필터링
        if region_keyword and vendor_type:
            mock_results = [r for r in mock_results if region_keyword in r.get('location', '')]
        
        # 예산 필터링
        if budget and vendor_type:
            mock_results = [r for r in mock_results 
                          if r.get('price_manwon', 0) <= budget * 1.2]  # 20% 여유
        
        return {
            "success": True,
            "data": {
                "query_params": {
                    "vendor_type": vendor_type,
                    "region": region_keyword,
                    "budget_limit": budget
                },
                "results": mock_results,
                "total_count": len(mock_results)
            },
            "message": f"Found {len(mock_results)} matching vendors"
        }
        
    except Exception as e:
        return {
            "success": False,
            "error": f"Database query failed: {str(e)}",
            "data": None
        }


def execute_web_search_tool(state: dict) -> dict:
    """
    Web search tool for latest wedding trends and vendor reviews.
    
    Searches for current wedding trends, recent reviews, price updates,
    and seasonal promotions to complement database information.
    """
    try:
        vendor_type = state.get('vendor_type')
        region_keyword = state.get('region_keyword')
        
        # 검색 쿼리 생성
        search_query = []
        if vendor_type:
            if vendor_type == "wedding_hall":
                search_query.append("웨딩홀")
            elif vendor_type == "studio":
                search_query.append("스튜디오")
        
        if region_keyword:
            search_query.append(region_keyword)
        
        search_query.append("후기 2024 2025")
        query_string = " ".join(search_query)
        
        # 모의 웹 검색 결과
        mock_web_results = [
            {
                "title": f"2025년 {vendor_type or '웨딩'} 트렌드 총정리",
                "url": "https://example.com/wedding-trends-2025",
                "snippet": "올해 가장 인기 있는 웨딩 트렌드와 예산 절약 팁을 소개합니다.",
                "source": "웨딩전문블로그",
                "date": "2024-12-15"
            },
            {
                "title": f"{region_keyword or '서울'} 웨딩 업체 최신 할인 정보",
                "url": "https://example.com/wedding-discount", 
                "snippet": "연말연시 특별 할인 이벤트 진행 중인 업체들을 모아봤습니다.",
                "source": "웨딩커뮤니티",
                "date": "2024-12-20"
            },
            {
                "title": "실제 후기: 웨딩 준비 총비용 공개",
                "url": "https://example.com/real-wedding-cost",
                "snippet": "실제 신혼부부가 공개하는 웨딩 총비용과 예산 분배 노하우",
                "source": "개인블로그",
                "date": "2024-11-28"
            }
        ]
        
        return {
            "success": True,
            "data": {
                "search_query": query_string,
                "results": mock_web_results,
                "total_results": len(mock_web_results),
                "search_metadata": {
                    "search_date": datetime.now().isoformat(),
                    "result_freshness": "recent"
                }
            },
            "message": f"Found {len(mock_web_results)} recent web results"
        }
        
    except Exception as e:
        return {
            "success": False,
            "error": f"Web search failed: {str(e)}",
            "data": None
        }


def execute_calculator_tool(state: dict) -> dict:
    """
    Calculator tool for wedding budget planning and cost analysis.
    
    Performs budget calculations, cost breakdowns, savings analysis,
    and financial planning for wedding preparation.
    """
    try:
        user_memo = state.get('user_memo', {})
        profile = user_memo.get('profile', {})
        user_input = state.get('user_input', '').lower()
        
        total_budget = profile.get('total_budget_manwon', 5000)  # 기본값 5000만원
        guest_count = profile.get('guest_count', 100)  # 기본값 100명
        
        # 표준 예산 분배 비율
        budget_allocation = {
            "wedding_hall": 0.50,      # 50%: 예식장 + 식사
            "studio": 0.15,            # 15%: 스튜디오 촬영
            "dress_makeup": 0.20,      # 20%: 드레스 + 메이크업
            "misc": 0.10,              # 10%: 기타 (꽃, 예물 등)
            "honeymoon": 0.05          # 5%: 신혼여행 (별도 예산)
        }
        
        # 항목별 예산 계산
        calculated_budget = {}
        for category, ratio in budget_allocation.items():
            calculated_budget[category] = int(total_budget * ratio)
        
        # 1인당 식사비 계산 (예식장 예산의 60%가 식사비라고 가정)
        meal_cost_per_person = int((calculated_budget["wedding_hall"] * 0.6 * 10000) / guest_count)
        
        # 절약 팁 계산
        savings_tips = [
            {
                "category": "스튜디오",
                "tip": "평일 촬영 선택",
                "saving_amount": int(calculated_budget["studio"] * 0.2),
                "saving_percentage": 20
            },
            {
                "category": "드레스",
                "tip": "렌탈 vs 구매 비교",
                "saving_amount": int(calculated_budget["dress_makeup"] * 0.3),
                "saving_percentage": 30
            },
            {
                "category": "예식장",
                "tip": "비수기 할인 활용",
                "saving_amount": int(calculated_budget["wedding_hall"] * 0.15),
                "saving_percentage": 15
            }
        ]
        
        total_potential_savings = sum(tip["saving_amount"] for tip in savings_tips)
        
        # D-day 계산
        wedding_date = profile.get('wedding_date')
        days_until_wedding = None
        if wedding_date:
            try:
                from datetime import datetime
                wedding_dt = datetime.strptime(wedding_date, '%Y-%m-%d')
                today = datetime.now()
                days_until_wedding = (wedding_dt - today).days
            except:
                days_until_wedding = None
        
        calculation_results = {
            "total_budget_manwon": total_budget,
            "guest_count": guest_count,
            "budget_breakdown": calculated_budget,
            "meal_cost_per_person": meal_cost_per_person,
            "savings_opportunities": savings_tips,
            "total_potential_savings": total_potential_savings,
            "days_until_wedding": days_until_wedding,
            "monthly_saving_needed": int(total_budget / 12) if days_until_wedding and days_until_wedding > 30 else None
        }
        
        return {
            "success": True,
            "data": calculation_results,
            "message": f"Budget calculation completed for {total_budget}만원 total budget"
        }
        
    except Exception as e:
        return {
            "success": False,
            "error": f"Budget calculation failed: {str(e)}",
            "data": None
        }


def execute_user_db_update_tool(state: dict) -> dict:
    """
    User database update tool for profile information management.
    
    Updates user profile information like wedding date, budget,
    guest count, preferences, and tracks decision history.
    """
    try:
        update_type = state.get('update_type')
        user_input = state.get('user_input', '')
        user_memo = state.get('user_memo', {})
        profile = user_memo.get('profile', {})
        
        # 업데이트할 데이터 추출
        updates = {}
        update_log = []
        
        if update_type == "wedding_date":
            # 날짜 정보 추출 (간단한 예시)
            if "크리스마스" in user_input:
                updates['wedding_date'] = "2025-12-25"
                update_log.append("결혼 날짜를 크리스마스(2025-12-25)로 설정")
            elif "봄" in user_input:
                updates['wedding_date'] = "2025-04-15"  # 임시 봄 날짜
                update_log.append("결혼 날짜를 봄(2025-04-15)으로 설정")
            else:
                # 기본 날짜 파싱 로직 (실제로는 더 정교해야 함)
                updates['wedding_date'] = "2025-06-15"  # 기본값
                update_log.append("결혼 날짜 업데이트")
        
        elif update_type == "budget":
            # 예산 정보 추출
            import re
            budget_match = re.search(r'(\d+)(?:천만|천|만)', user_input)
            if budget_match:
                amount = int(budget_match.group(1))
                if "천만" in user_input:
                    amount *= 1000  # 천만원 -> 만원 단위
                elif "천" in user_input and "만" not in user_input:
                    amount /= 10    # 천원 -> 만원 단위
                updates['total_budget_manwon'] = int(amount)
                update_log.append(f"총 예산을 {amount}만원으로 설정")
        
        elif update_type == "guest_count":
            # 하객 수 추출
            import re
            guest_match = re.search(r'(\d+)(?:명|분|사람)', user_input)
            if guest_match:
                count = int(guest_match.group(1))
                updates['guest_count'] = count
                update_log.append(f"하객 수를 {count}명으로 설정")
        
        elif update_type == "location_preference":
            # 지역 선호도 업데이트
            locations = profile.get('preferred_locations', [])
            if '강남' in user_input:
                locations.append('강남구')
            if '홍대' in user_input:
                locations.append('마포구')
            updates['preferred_locations'] = list(set(locations))  # 중복 제거
            update_log.append(f"선호 지역 업데이트: {updates['preferred_locations']}")
        
        # 프로필 업데이트
        updated_profile = profile.copy()
        updated_profile.update(updates)
        
        # 업데이트 메타데이터
        update_metadata = {
            "update_type": update_type,
            "timestamp": datetime.now().isoformat(),
            "changes": updates,
            "log": update_log,
            "previous_values": {key: profile.get(key) for key in updates.keys()}
        }
        
        return {
            "success": True,
            "data": {
                "updated_profile": updated_profile,
                "update_metadata": update_metadata,
                "changes_made": len(updates)
            },
            "message": f"Successfully updated {len(updates)} profile fields"
        }
        
    except Exception as e:
        return {
            "success": False,
            "error": f"Profile update failed: {str(e)}",
            "data": None
        }



In [36]:
# Cell 9: tool_execution_node 테스트
def test_tool_execution_node():
    """tool_execution_node 함수를 다양한 시나리오로 테스트"""
    
    test_cases = [
        {
            "name": "DB 조회만 실행",
            "state": {
                "tools_to_execute": ["db_query_tool"],
                "vendor_type": "wedding_hall",
                "region_keyword": "강남",
                "user_memo": {
                    "profile": {"total_budget_manwon": 3000}
                }
            }
        },
        {
            "name": "예산 계산 + DB 조회",
            "state": {
                "tools_to_execute": ["calculator_tool", "db_query_tool"],
                "user_input": "총 예산 계산해주세요",
                "user_memo": {
                    "profile": {
                        "total_budget_manwon": 5000,
                        "guest_count": 150,
                        "wedding_date": "2025-10-15"
                    }
                }
            }
        },
        {
            "name": "사용자 정보 업데이트",
            "state": {
                "tools_to_execute": ["user_db_update_tool"],
                "update_type": "wedding_date",
                "user_input": "결혼식 날짜를 크리스마스로 바꿔주세요",
                "user_memo": {
                    "profile": {"user_id": "test_user", "wedding_date": None}
                }
            }
        },
        {
            "name": "복합 툴 실행 (DB + 웹검색)",
            "state": {
                "tools_to_execute": ["db_query_tool", "web_search_tool"],
                "vendor_type": "studio",
                "region_keyword": "성수동",
                "user_memo": {"profile": {}}
            }
        }
    ]
    
    print("🧪 tool_execution_node 테스트 시작\n")
    
    for i, test in enumerate(test_cases, 1):
        print(f"{'='*60}")
        print(f"테스트 {i}: {test['name']}")
        print(f"실행할 툴: {test['state']['tools_to_execute']}")
        print("-" * 40)
        
        # tool_execution_node 실행
        result_state = tool_execution_node(test['state'])
        
        # 결과 분석
        tool_results = result_state.get('tool_results', {})
        successful_tools = result_state.get('successful_tools', [])
        failed_tools = result_state.get('failed_tools', [])
        
        print(f"\n📊 실행 결과:")
        print(f"✅ 성공한 툴: {successful_tools}")
        if failed_tools:
            print(f"❌ 실패한 툴: {failed_tools}")
        
        # 각 툴별 결과 요약
        for tool_name, result in tool_results.items():
            success = result.get('success', False)
            message = result.get('message', result.get('error', 'No message'))
            
            status_icon = "✅" if success else "❌"
            print(f"{status_icon} {tool_name}: {message}")
            
            # 데이터가 있으면 간단히 요약
            if success and result.get('data'):
                data = result['data']
                if tool_name == "db_query_tool":
                    results_count = data.get('total_count', 0)
                    print(f"   📋 검색 결과: {results_count}개")
                elif tool_name == "calculator_tool":
                    budget = data.get('total_budget_manwon', 0)
                    savings = data.get('total_potential_savings', 0)
                    print(f"   💰 총예산: {budget}만원, 절약가능: {savings}만원")
                elif tool_name == "user_db_update_tool":
                    changes = data.get('changes_made', 0)
                    print(f"   🔄 업데이트 항목: {changes}개")
                elif tool_name == "web_search_tool":
                    web_results = data.get('total_results', 0)
                    print(f"   🌐 웹검색 결과: {web_results}개")
        
        print(f"\n⏱️ 전체 상태: {result_state.get('status')}")
        print(f"📝 요약: {result_state.get('reason')}")
        print()
    
    print("🧪 tool_execution_node 테스트 완료")

# 테스트 실행
test_tool_execution_node()

🧪 tool_execution_node 테스트 시작

테스트 1: DB 조회만 실행
실행할 툴: ['db_query_tool']
----------------------------------------
🔧 실행할 툴: ['db_query_tool']
⚡ db_query_tool 실행 중...
✅ db_query_tool 완료 (0.00초)
📊 실행 완료: 성공 1, 실패 0

📊 실행 결과:
✅ 성공한 툴: ['db_query_tool']
✅ db_query_tool: Found 1 matching vendors
   📋 검색 결과: 1개

⏱️ 전체 상태: ok
📝 요약: Executed 1/1 tools successfully

테스트 2: 예산 계산 + DB 조회
실행할 툴: ['calculator_tool', 'db_query_tool']
----------------------------------------
🔧 실행할 툴: ['calculator_tool', 'db_query_tool']
⚡ calculator_tool 실행 중...
✅ calculator_tool 완료 (0.00초)
⚡ db_query_tool 실행 중...
✅ db_query_tool 완료 (0.00초)
📊 실행 완료: 성공 2, 실패 0

📊 실행 결과:
✅ 성공한 툴: ['calculator_tool', 'db_query_tool']
✅ calculator_tool: Budget calculation completed for 5000만원 total budget
   💰 총예산: 5000만원, 절약가능: 825만원
✅ db_query_tool: Found 1 matching vendors
   📋 검색 결과: 1개

⏱️ 전체 상태: ok
📝 요약: Executed 2/2 tools successfully

테스트 3: 사용자 정보 업데이트
실행할 툴: ['user_db_update_tool']
----------------------------------------
🔧 실행할

In [40]:
# Cell 9: memo_update_node 구현

def memo_update_node(state: dict) -> dict:
    """
    Intelligent memory update node that processes tool execution results and updates user memory.
    
    This node serves as the central memory management system that:
    - Analyzes tool execution results and extracts actionable insights
    - Updates user profile with new information (wedding date, budget, preferences)
    - Maintains search history and vendor interaction patterns
    - Tracks decision-making progress and milestone completions
    - Ensures memory consistency and validates data integrity
    - Manages memory versioning and backup for recovery scenarios
    
    Memory Update Strategies:
    - Profile Updates: Direct updates to user demographics and preferences
    - Search Pattern Learning: Tracks user search behaviors and refines recommendations
    - Decision Tracking: Records confirmed choices and eliminates outdated options
    - Context Enrichment: Adds contextual information from successful tool executions
    - Smart Deduplication: Prevents redundant information while preserving user intent
    
    The node implements intelligent merge strategies to handle conflicting information
    and maintains a complete audit trail of all memory modifications for transparency.
    
    Args:
        state (dict): State containing tool_results, user_memo, and execution context
        
    Returns:
        dict: Updated state with refreshed user_memo and update metadata
    """
    
    # Extract required information from state
    tool_results = state.get('tool_results', {})
    user_memo = state.get('user_memo', {})
    memo_file_path = state.get('memo_file_path')
    user_id = state.get('user_id')
    
    # Initialize tracking variables
    updates_made = []
    backup_created = False
    update_summary = {}
    
    try:
        # Validate inputs
        if not user_id:
            state['status'] = "error"
            state['reason'] = "No user_id provided for memory update"
            return state
            
        if not user_memo:
            state['status'] = "error" 
            state['reason'] = "No user_memo found in state"
            return state
        
        # Create backup before making changes
        if memo_file_path and os.path.exists(memo_file_path):
            backup_path = f"{memo_file_path}.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
            try:
                with open(memo_file_path, 'r', encoding='utf-8') as f:
                    backup_data = f.read()
                with open(backup_path, 'w', encoding='utf-8') as f:
                    f.write(backup_data)
                backup_created = True
                print(f"🔄 Memory backup created: {backup_path}")
            except Exception as e:
                print(f"⚠️ Failed to create backup: {str(e)}")
        
        # Initialize update tracking
        updated_memo = deepcopy(user_memo)
        
        # Process each tool result
        for tool_name, tool_result in tool_results.items():
            if not tool_result.get('success', False):
                continue
                
            tool_data = tool_result.get('data', {})
            
            # Process different types of tool results
            if tool_name == "user_db_update_tool":
                updates = _process_user_update_result(tool_data, updated_memo)
                updates_made.extend(updates)
                
            elif tool_name == "db_query_tool":
                updates = _process_db_query_result(tool_data, updated_memo)
                updates_made.extend(updates)
                
            elif tool_name == "web_search_tool":
                updates = _process_web_search_result(tool_data, updated_memo)
                updates_made.extend(updates)
                
            elif tool_name == "calculator_tool":
                updates = _process_calculator_result(tool_data, updated_memo)
                updates_made.extend(updates)
        
        # Update conversation summary if significant changes were made
        if updates_made:
            _update_conversation_summary(updated_memo, updates_made, state)
        
        # Add metadata to updated memo
        _add_update_metadata(updated_memo, updates_made, tool_results)
        
        # Save updated memory to file
        if memo_file_path:
            try:
                os.makedirs(os.path.dirname(memo_file_path), exist_ok=True)
                with open(memo_file_path, 'w', encoding='utf-8') as f:
                    json.dump(updated_memo, f, ensure_ascii=False, indent=2)
                    
                print(f"💾 Memory updated successfully: {len(updates_made)} changes")
                
            except Exception as e:
                state['status'] = "error"
                state['reason'] = f"Failed to save updated memory: {str(e)}"
                return state
        
        # Update state with new memory
        state['user_memo'] = updated_memo
        state['memo_updates_made'] = updates_made
        state['memo_backup_created'] = backup_created
        state['status'] = "ok"
        
        # Generate update summary for response
        update_summary = {
            'total_updates': len(updates_made),
            'profile_updates': len([u for u in updates_made if u.get('category') == 'profile']),
            'preference_updates': len([u for u in updates_made if u.get('category') == 'preferences']),
            'search_history_updates': len([u for u in updates_made if u.get('category') == 'search_history']),
            'backup_created': backup_created
        }
        
        state['update_summary'] = update_summary
        
        if updates_made:
            print(f"✅ Memory update completed: {update_summary}")
        else:
            print("ℹ️ No memory updates required")
            
        return state
        
    except Exception as e:
        state['status'] = "error"
        state['reason'] = f"Memory update failed: {str(e)}"
        print(f"❌ Memory update error: {str(e)}")
        return state


def _process_user_update_result(tool_data: dict, user_memo: dict) -> List[dict]:
    """Process user database update tool results."""
    updates = []
    
    if 'updated_fields' in tool_data:
        profile = user_memo.setdefault('profile', {})
        
        for field, new_value in tool_data['updated_fields'].items():
            old_value = profile.get(field)
            if old_value != new_value:
                profile[field] = new_value
                updates.append({
                    'category': 'profile',
                    'field': field,
                    'old_value': old_value,
                    'new_value': new_value,
                    'timestamp': datetime.now().isoformat()
                })
    
    return updates


def _process_db_query_result(tool_data: dict, user_memo: dict) -> List[dict]:
    """Process database query tool results."""
    updates = []
    
    # Track search patterns
    if 'query_type' in tool_data and 'results_count' in tool_data:
        search_history = user_memo.setdefault('search_history', [])
        
        search_entry = {
            'query_type': tool_data['query_type'],
            'results_count': tool_data['results_count'],
            'timestamp': datetime.now().isoformat(),
            'vendor_type': tool_data.get('vendor_type'),
            'region': tool_data.get('region')
        }
        
        search_history.append(search_entry)
        
        # Keep only last 50 searches
        if len(search_history) > 50:
            search_history[:] = search_history[-50:]
            
        updates.append({
            'category': 'search_history',
            'action': 'add_search',
            'data': search_entry,
            'timestamp': datetime.now().isoformat()
        })
    
    return updates


def _process_web_search_result(tool_data: dict, user_memo: dict) -> List[dict]:
    """Process web search tool results."""
    updates = []
    
    # Extract useful information from web search results
    if 'search_query' in tool_data and 'results' in tool_data:
        preferences = user_memo.setdefault('preferences', {})
        external_insights = preferences.setdefault('external_insights', [])
        
        insight_entry = {
            'source': 'web_search',
            'query': tool_data['search_query'],
            'results_summary': tool_data.get('summary', ''),
            'timestamp': datetime.now().isoformat()
        }
        
        external_insights.append(insight_entry)
        
        # Keep only last 20 insights
        if len(external_insights) > 20:
            external_insights[:] = external_insights[-20:]
            
        updates.append({
            'category': 'preferences',
            'action': 'add_external_insight',
            'data': insight_entry,
            'timestamp': datetime.now().isoformat()
        })
    
    return updates


def _process_calculator_result(tool_data: dict, user_memo: dict) -> List[dict]:
    """Process calculator tool results."""
    updates = []
    
    # Update budget calculations if relevant
    if 'calculation_type' in tool_data and 'result' in tool_data:
        calc_type = tool_data['calculation_type']
        
        if calc_type in ['budget_calculation', 'cost_estimation']:
            profile = user_memo.setdefault('profile', {})
            calculations = profile.setdefault('budget_calculations', [])
            
            calc_entry = {
                'type': calc_type,
                'result': tool_data['result'],
                'inputs': tool_data.get('inputs', {}),
                'timestamp': datetime.now().isoformat()
            }
            
            calculations.append(calc_entry)
            
            # Keep only last 10 calculations
            if len(calculations) > 10:
                calculations[:] = calculations[-10:]
                
            updates.append({
                'category': 'profile',
                'action': 'add_calculation',
                'data': calc_entry,
                'timestamp': datetime.now().isoformat()
            })
    
    return updates


def _update_conversation_summary(user_memo: dict, updates_made: List[dict], state: dict):
    """Update conversation summary based on recent changes."""
    
    current_summary = user_memo.get('conversation_summary', '')
    user_input = state.get('user_input', '')
    
    # Create simple summary of this interaction
    if updates_made and user_input:
        new_entry = f"[{datetime.now().strftime('%Y-%m-%d %H:%M')}] User: {user_input[:100]}... "
        new_entry += f"Updates: {len(updates_made)} changes made"
        
        if current_summary:
            # Append to existing summary, keep reasonable length
            summary_lines = current_summary.split('\n')
            summary_lines.append(new_entry)
            
            # Keep only last 10 interactions
            if len(summary_lines) > 10:
                summary_lines = summary_lines[-10:]
                
            user_memo['conversation_summary'] = '\n'.join(summary_lines)
        else:
            user_memo['conversation_summary'] = new_entry


def _add_update_metadata(user_memo: dict, updates_made: List[dict], tool_results: dict):
    """Add metadata about the update process."""
    
    metadata = user_memo.setdefault('metadata', {})
    
    metadata.update({
        'last_updated': datetime.now().isoformat(),
        'total_updates_count': metadata.get('total_updates_count', 0) + len(updates_made),
        'last_tool_execution': {
            'tools_used': list(tool_results.keys()),
            'successful_tools': [name for name, result in tool_results.items() 
                               if result.get('success', False)],
            'timestamp': datetime.now().isoformat()
        }
    })
    
    # Update version if significant changes
    if len(updates_made) > 0:
        current_version = user_memo.get('version', '1.0')
        try:
            version_parts = current_version.split('.')
            minor_version = int(version_parts[1]) + 1
            user_memo['version'] = f"{version_parts[0]}.{minor_version}"
        except (IndexError, ValueError):
            user_memo['version'] = "1.1"

In [41]:
def test_memo_update_node():
    """Comprehensive test suite for memo_update_node functionality."""
    
    print("🧪 memo_update_node 테스트 시작\n")
    
    # Create temporary directory for testing
    with tempfile.TemporaryDirectory() as temp_dir:
        
        test_cases = [
            {
                "name": "사용자 정보 업데이트 테스트",
                "scenario": "user_profile_update",
                "state": _create_test_state_user_update(temp_dir)
            },
            {
                "name": "DB 쿼리 결과 처리 테스트", 
                "scenario": "db_query_processing",
                "state": _create_test_state_db_query(temp_dir)
            },
            {
                "name": "웹 검색 결과 처리 테스트",
                "scenario": "web_search_processing", 
                "state": _create_test_state_web_search(temp_dir)
            },
            {
                "name": "계산 결과 처리 테스트",
                "scenario": "calculator_processing",
                "state": _create_test_state_calculator(temp_dir)
            },
            {
                "name": "복합 도구 결과 처리 테스트",
                "scenario": "multiple_tools",
                "state": _create_test_state_multiple_tools(temp_dir)
            },
            {
                "name": "에러 상황 처리 테스트",
                "scenario": "error_handling",
                "state": _create_test_state_error_cases(temp_dir)
            }
        ]
        
        for i, test_case in enumerate(test_cases, 1):
            print(f"{'='*60}")
            print(f"테스트 {i}: {test_case['name']}")
            print(f"시나리오: {test_case['scenario']}")
            print("-" * 40)
            
            # Execute memo_update_node
            try:
                initial_state = test_case['state'].copy()
                result_state = memo_update_node(test_case['state'])
                
                # Analyze results
                _analyze_test_results(test_case, initial_state, result_state)
                
            except Exception as e:
                print(f"❌ 테스트 실행 중 오류: {str(e)}")
                
            print("-" * 60)
            print()
    
    print("🧪 memo_update_node 테스트 완료")


def _create_test_state_user_update(temp_dir: str) -> dict:
    """Create test state for user profile update scenario."""
    
    user_id = "test_user_001"
    memo_file = os.path.join(temp_dir, f"user_{user_id}_memo.json")
    
    # Create initial user memo
    initial_memo = {
        "version": "1.0",
        "profile": {
            "user_id": user_id,
            "name": "김철수",
            "wedding_date": None,
            "total_budget_manwon": None,
            "guest_count": None
        },
        "preferences": {
            "style_preferences": [],
            "location_preferences": ["강남구"]
        },
        "search_history": [],
        "conversation_summary": "",
        "metadata": {
            "created_at": datetime.now().isoformat(),
            "total_updates_count": 0
        }
    }
    
    # Save initial memo
    with open(memo_file, 'w', encoding='utf-8') as f:
        json.dump(initial_memo, f, ensure_ascii=False, indent=2)
    
    return {
        "user_id": user_id,
        "memo_file_path": memo_file,
        "user_memo": initial_memo,
        "user_input": "결혼식 날짜를 2025년 10월 15일로 설정하고 예산은 5000만원으로 해주세요",
        "tool_results": {
            "user_db_update_tool": {
                "success": True,
                "data": {
                    "updated_fields": {
                        "wedding_date": "2025-10-15",
                        "total_budget_manwon": 5000
                    },
                    "operation": "profile_update"
                },
                "execution_time": datetime.now().isoformat()
            }
        }
    }


def _create_test_state_db_query(temp_dir: str) -> dict:
    """Create test state for database query result processing."""
    
    user_id = "test_user_002"
    memo_file = os.path.join(temp_dir, f"user_{user_id}_memo.json")
    
    initial_memo = {
        "version": "1.0", 
        "profile": {"user_id": user_id},
        "preferences": {},
        "search_history": [],
        "metadata": {"total_updates_count": 0}
    }
    
    with open(memo_file, 'w', encoding='utf-8') as f:
        json.dump(initial_memo, f, ensure_ascii=False, indent=2)
    
    return {
        "user_id": user_id,
        "memo_file_path": memo_file,
        "user_memo": initial_memo,
        "user_input": "강남 스튜디오 추천해줘",
        "tool_results": {
            "db_query_tool": {
                "success": True,
                "data": {
                    "query_type": "vendor_search",
                    "vendor_type": "studio",
                    "region": "강남구",
                    "results_count": 15,
                    "results": [
                        {"name": "스튜디오A", "rating": 4.5},
                        {"name": "스튜디오B", "rating": 4.2}
                    ]
                }
            }
        }
    }


def _create_test_state_web_search(temp_dir: str) -> dict:
    """Create test state for web search result processing."""
    
    user_id = "test_user_003"
    memo_file = os.path.join(temp_dir, f"user_{user_id}_memo.json")
    
    initial_memo = {
        "version": "1.0",
        "profile": {"user_id": user_id},
        "preferences": {},
        "metadata": {"total_updates_count": 0}
    }
    
    with open(memo_file, 'w', encoding='utf-8') as f:
        json.dump(initial_memo, f, ensure_ascii=False, indent=2)
    
    return {
        "user_id": user_id,
        "memo_file_path": memo_file,
        "user_memo": initial_memo,
        "user_input": "최신 웨딩 트렌드 알려줘",
        "tool_results": {
            "web_search_tool": {
                "success": True,
                "data": {
                    "search_query": "2025 웨딩 트렌드",
                    "results": [
                        {"title": "2025년 웨딩 트렌드", "url": "example.com"},
                        {"title": "요즘 인기 웨딩 스타일", "url": "example2.com"}
                    ],
                    "summary": "미니멀 스타일과 야외 웨딩이 인기"
                }
            }
        }
    }


def _create_test_state_calculator(temp_dir: str) -> dict:
    """Create test state for calculator result processing."""
    
    user_id = "test_user_004"
    memo_file = os.path.join(temp_dir, f"user_{user_id}_memo.json")
    
    initial_memo = {
        "version": "1.0",
        "profile": {"user_id": user_id, "guest_count": 100},
        "preferences": {},
        "metadata": {"total_updates_count": 0}
    }
    
    with open(memo_file, 'w', encoding='utf-8') as f:
        json.dump(initial_memo, f, ensure_ascii=False, indent=2)
    
    return {
        "user_id": user_id,
        "memo_file_path": memo_file,
        "user_memo": initial_memo,
        "user_input": "하객 100명 기준으로 예상 비용 계산해줘",
        "tool_results": {
            "calculator_tool": {
                "success": True,
                "data": {
                    "calculation_type": "budget_calculation",
                    "result": 4500,
                    "inputs": {
                        "guest_count": 100,
                        "cost_per_guest": 45
                    },
                    "breakdown": {
                        "venue": 2000,
                        "catering": 2000,
                        "misc": 500
                    }
                }
            }
        }
    }


def _create_test_state_multiple_tools(temp_dir: str) -> dict:
    """Create test state for multiple tool results processing."""
    
    user_id = "test_user_005"
    memo_file = os.path.join(temp_dir, f"user_{user_id}_memo.json")
    
    initial_memo = {
        "version": "1.0",
        "profile": {"user_id": user_id},
        "preferences": {},
        "search_history": [],
        "metadata": {"total_updates_count": 0}
    }
    
    with open(memo_file, 'w', encoding='utf-8') as f:
        json.dump(initial_memo, f, ensure_ascii=False, indent=2)
    
    return {
        "user_id": user_id,
        "memo_file_path": memo_file,
        "user_memo": initial_memo,
        "user_input": "예산 3000만원으로 설정하고 강남 웨딩홀 찾아줘",
        "tool_results": {
            "user_db_update_tool": {
                "success": True,
                "data": {
                    "updated_fields": {"total_budget_manwon": 3000},
                    "operation": "profile_update"
                }
            },
            "db_query_tool": {
                "success": True,
                "data": {
                    "query_type": "venue_search",
                    "vendor_type": "venue",
                    "region": "강남구",
                    "results_count": 8
                }
            },
            "web_search_tool": {
                "success": True,
                "data": {
                    "search_query": "강남 웨딩홀 추천",
                    "summary": "강남 지역 프리미엄 웨딩홀 정보"
                }
            }
        }
    }


def _create_test_state_error_cases(temp_dir: str) -> dict:
    """Create test state for error handling scenarios."""
    
    return {
        "user_id": None,  # Missing user_id to trigger error
        "memo_file_path": None,
        "user_memo": {},
        "tool_results": {
            "user_db_update_tool": {
                "success": False,
                "error_message": "Database connection failed"
            }
        }
    }


def _analyze_test_results(test_case: dict, initial_state: dict, result_state: dict):
    """Analyze and display test results."""
    
    status = result_state.get('status', 'unknown')
    scenario = test_case['scenario']
    
    print(f"📊 결과 분석:")
    print(f"   상태: {status}")
    
    if status == "ok":
        print("   ✅ 성공적으로 실행됨")
        
        # Check specific updates based on scenario
        updates_made = result_state.get('memo_updates_made', [])
        update_summary = result_state.get('update_summary', {})
        
        print(f"   📝 업데이트 수: {len(updates_made)}")
        print(f"   📈 요약: {update_summary}")
        
        # Scenario-specific validation
        if scenario == "user_profile_update":
            _validate_profile_update(initial_state, result_state)
            
        elif scenario == "db_query_processing":
            _validate_search_history_update(initial_state, result_state)
            
        elif scenario == "web_search_processing":
            _validate_external_insights_update(initial_state, result_state)
            
        elif scenario == "calculator_processing":
            _validate_calculation_update(initial_state, result_state)
            
        elif scenario == "multiple_tools":
            _validate_multiple_updates(initial_state, result_state)
        
        # Check backup creation
        backup_created = result_state.get('memo_backup_created', False)
        print(f"   💾 백업 생성: {'예' if backup_created else '아니오'}")
        
        # Check version update
        old_version = initial_state.get('user_memo', {}).get('version', '1.0')
        new_version = result_state.get('user_memo', {}).get('version', '1.0')
        if old_version != new_version:
            print(f"   🔄 버전 업데이트: {old_version} → {new_version}")
            
    else:
        print("   ❌ 실행 실패")
        reason = result_state.get('reason', 'Unknown error')
        print(f"   🚨 실패 이유: {reason}")


def _validate_profile_update(initial_state: dict, result_state: dict):
    """Validate profile update test results."""
    
    initial_profile = initial_state.get('user_memo', {}).get('profile', {})
    updated_profile = result_state.get('user_memo', {}).get('profile', {})
    
    # Check if wedding_date was updated
    if updated_profile.get('wedding_date') == "2025-10-15":
        print("     ✅ 결혼일 업데이트 확인")
    else:
        print("     ❌ 결혼일 업데이트 실패")
    
    # Check if budget was updated
    if updated_profile.get('total_budget_manwon') == 5000:
        print("     ✅ 예산 업데이트 확인")
    else:
        print("     ❌ 예산 업데이트 실패")


def _validate_search_history_update(initial_state: dict, result_state: dict):
    """Validate search history update test results."""
    
    search_history = result_state.get('user_memo', {}).get('search_history', [])
    
    if len(search_history) > 0:
        latest_search = search_history[-1]
        if latest_search.get('vendor_type') == 'studio':
            print("     ✅ 검색 기록 업데이트 확인")
        else:
            print("     ❌ 검색 기록 내용 불일치")
    else:
        print("     ❌ 검색 기록 업데이트 실패")


def _validate_external_insights_update(initial_state: dict, result_state: dict):
    """Validate external insights update test results."""
    
    insights = result_state.get('user_memo', {}).get('preferences', {}).get('external_insights', [])
    
    if len(insights) > 0:
        latest_insight = insights[-1]
        if latest_insight.get('source') == 'web_search':
            print("     ✅ 외부 인사이트 업데이트 확인")
        else:
            print("     ❌ 외부 인사이트 소스 불일치")
    else:
        print("     ❌ 외부 인사이트 업데이트 실패")


def _validate_calculation_update(initial_state: dict, result_state: dict):
    """Validate calculation update test results."""
    
    calculations = result_state.get('user_memo', {}).get('profile', {}).get('budget_calculations', [])
    
    if len(calculations) > 0:
        latest_calc = calculations[-1]
        if latest_calc.get('type') == 'budget_calculation':
            print("     ✅ 계산 결과 업데이트 확인")
        else:
            print("     ❌ 계산 유형 불일치")
    else:
        print("     ❌ 계산 결과 업데이트 실패")


def _validate_multiple_updates(initial_state: dict, result_state: dict):
    """Validate multiple tool updates test results."""
    
    updated_memo = result_state.get('user_memo', {})
    updates_made = result_state.get('memo_updates_made', [])
    
    # Check profile update
    profile_updated = updated_memo.get('profile', {}).get('total_budget_manwon') == 3000
    
    # Check search history
    search_added = len(updated_memo.get('search_history', [])) > 0
    
    # Check external insights
    insights_added = len(updated_memo.get('preferences', {}).get('external_insights', [])) > 0
    
    success_count = sum([profile_updated, search_added, insights_added])
    
    print(f"     📊 다중 업데이트 결과: {success_count}/3 성공")
    if profile_updated:
        print("     ✅ 프로필 업데이트")
    if search_added:
        print("     ✅ 검색 기록 추가")
    if insights_added:
        print("     ✅ 외부 인사이트 추가")


# 테스트 실행 함수
if __name__ == "__main__":
    test_memo_update_node()

🧪 memo_update_node 테스트 시작

테스트 1: 사용자 정보 업데이트 테스트
시나리오: user_profile_update
----------------------------------------
🔄 Memory backup created: C:\Users\amy\AppData\Local\Temp\tmp_rfs_72k\user_test_user_001_memo.json.backup_20250922_150909
💾 Memory updated successfully: 2 changes
✅ Memory update completed: {'total_updates': 2, 'profile_updates': 2, 'preference_updates': 0, 'search_history_updates': 0, 'backup_created': True}
📊 결과 분석:
   상태: ok
   ✅ 성공적으로 실행됨
   📝 업데이트 수: 2
   📈 요약: {'total_updates': 2, 'profile_updates': 2, 'preference_updates': 0, 'search_history_updates': 0, 'backup_created': True}
     ✅ 결혼일 업데이트 확인
     ✅ 예산 업데이트 확인
   💾 백업 생성: 예
   🔄 버전 업데이트: 1.0 → 1.1
------------------------------------------------------------

테스트 2: DB 쿼리 결과 처리 테스트
시나리오: db_query_processing
----------------------------------------
🔄 Memory backup created: C:\Users\amy\AppData\Local\Temp\tmp_rfs_72k\user_test_user_002_memo.json.backup_20250922_150909
💾 Memory updated successfully: 1 changes
✅ Mem

In [42]:
# Cell 9: memo_update_node 구현 (개선된 버전)

def memo_update_node(state: dict) -> dict:
    """
    Intelligent memory update node that processes tool execution results and updates user memory.
    
    This node serves as the central memory management system that:
    - Analyzes tool execution results and extracts actionable insights
    - Updates user profile with new information (wedding date, budget, preferences)
    - Maintains search history and vendor interaction patterns
    - Tracks decision-making progress and milestone completions
    - Ensures memory consistency and validates data integrity
    - Manages memory versioning and change history within JSON structure
    
    Memory Update Strategies:
    - Conflict Resolution: User confirmation takes priority when existing data conflicts with new data
    - Content Preservation: Existing content is maintained when summary memo has no new information
    - Profile Updates: Direct updates to user demographics and preferences with conflict detection
    - Search Pattern Learning: Tracks user search behaviors and refines recommendations
    - Decision Tracking: Records confirmed choices and eliminates outdated options
    - Context Enrichment: Adds contextual information from successful tool executions
    - Smart Deduplication: Prevents redundant information while preserving user intent
    
    Version Management:
    - History Tracking: All changes are stored in update_history array within the JSON file
    - No External Backups: Version history is maintained as part of the main JSON structure
    - Change Append: New information is appended to existing data rather than overwriting
    - Rollback Capability: Previous versions can be accessed through update_history
    
    The node implements intelligent merge strategies with user confirmation for conflicts
    and maintains a complete audit trail within the JSON structure for transparency.
    
    Args:
        state (dict): State containing tool_results, user_memo, and execution context
        
    Returns:
        dict: Updated state with refreshed user_memo, conflict notifications, and update metadata
    """
    
    # Extract required information from state
    tool_results = state.get('tool_results', {})
    user_memo = state.get('user_memo', {})
    memo_file_path = state.get('memo_file_path')
    user_id = state.get('user_id')
    user_input = state.get('user_input', '')
    
    # Initialize tracking variables
    updates_made = []
    conflicts_detected = []
    user_confirmations_needed = []
    update_summary = {}
    
    try:
        # Validate inputs
        if not user_id:
            state['status'] = "error"
            state['reason'] = "No user_id provided for memory update"
            return state
            
        if not user_memo:
            state['status'] = "error" 
            state['reason'] = "No user_memo found in state"
            return state
        
        # Initialize update history structure if not exists
        updated_memo = deepcopy(user_memo)
        if 'update_history' not in updated_memo:
            updated_memo['update_history'] = []
        
        # Create snapshot of current state before updates
        pre_update_snapshot = {
            'timestamp': datetime.now().isoformat(),
            'version': updated_memo.get('version', '1.0'),
            'trigger': 'memo_update_node',
            'user_input': user_input[:200] + '...' if len(user_input) > 200 else user_input,
            'profile_snapshot': deepcopy(updated_memo.get('profile', {})),
            'preferences_snapshot': deepcopy(updated_memo.get('preferences', {}))
        }
        
        # Process each tool result with conflict detection
        for tool_name, tool_result in tool_results.items():
            if not tool_result.get('success', False):
                continue
                
            tool_data = tool_result.get('data', {})
            
            # Process different types of tool results
            if tool_name == "user_db_update_tool":
                updates, conflicts = _process_user_update_with_conflicts(tool_data, updated_memo, user_input)
                updates_made.extend(updates)
                conflicts_detected.extend(conflicts)
                
            elif tool_name == "db_query_tool":
                updates = _process_db_query_result_append(tool_data, updated_memo)
                updates_made.extend(updates)
                
            elif tool_name == "web_search_tool":
                updates = _process_web_search_result_append(tool_data, updated_memo)
                updates_made.extend(updates)
                
            elif tool_name == "calculator_tool":
                updates = _process_calculator_result_append(tool_data, updated_memo)
                updates_made.extend(updates)
        
        # Handle conflicts with user confirmation priority
        if conflicts_detected:
            confirmation_result = _handle_conflicts_with_user_priority(conflicts_detected, updated_memo, state)
            user_confirmations_needed = confirmation_result['confirmations_needed']
            
            # If user confirmation is needed, mark state for user interaction
            if user_confirmations_needed:
                state['status'] = "user_confirmation_required"
                state['conflicts_detected'] = conflicts_detected
                state['confirmations_needed'] = user_confirmations_needed
                state['reason'] = "User confirmation required for conflicting information"
                return state
        
        # Update conversation summary only if new content exists
        if updates_made and _has_new_meaningful_content(updates_made, updated_memo):
            _update_conversation_summary_preserve_existing(updated_memo, updates_made, state)
        
        # Add current update to history before saving
        if updates_made:
            post_update_snapshot = pre_update_snapshot.copy()
            post_update_snapshot.update({
                'updates_made': updates_made,
                'conflicts_resolved': len(conflicts_detected),
                'post_update_profile': deepcopy(updated_memo.get('profile', {})),
                'post_update_preferences': deepcopy(updated_memo.get('preferences', {}))
            })
            
            updated_memo['update_history'].append(post_update_snapshot)
            
            # Keep only last 20 update records to prevent JSON bloat
            if len(updated_memo['update_history']) > 20:
                updated_memo['update_history'] = updated_memo['update_history'][-20:]
        
        # Update version and metadata
        _update_version_and_metadata(updated_memo, updates_made, tool_results)
        
        # Save updated memory to file
        if memo_file_path:
            try:
                os.makedirs(os.path.dirname(memo_file_path), exist_ok=True)
                with open(memo_file_path, 'w', encoding='utf-8') as f:
                    json.dump(updated_memo, f, ensure_ascii=False, indent=2)
                    
                print(f"💾 Memory updated successfully: {len(updates_made)} changes")
                
            except Exception as e:
                state['status'] = "error"
                state['reason'] = f"Failed to save updated memory: {str(e)}"
                return state
        
        # Update state with new memory
        state['user_memo'] = updated_memo
        state['memo_updates_made'] = updates_made
        state['conflicts_resolved'] = len(conflicts_detected)
        state['status'] = "ok"
        
        # Generate update summary for response
        update_summary = {
            'total_updates': len(updates_made),
            'profile_updates': len([u for u in updates_made if u.get('category') == 'profile']),
            'preference_updates': len([u for u in updates_made if u.get('category') == 'preferences']),
            'search_history_updates': len([u for u in updates_made if u.get('category') == 'search_history']),
            'conflicts_resolved': len(conflicts_detected),
            'history_entries_total': len(updated_memo.get('update_history', []))
        }
        
        state['update_summary'] = update_summary
        
        if updates_made:
            print(f"✅ Memory update completed: {update_summary}")
        else:
            print("ℹ️ No memory updates required")
            
        return state
        
    except Exception as e:
        state['status'] = "error"
        state['reason'] = f"Memory update failed: {str(e)}"
        print(f"❌ Memory update error: {str(e)}")
        return state


def _process_user_update_with_conflicts(tool_data: dict, user_memo: dict, user_input: str) -> tuple:
    """Process user database update tool results with conflict detection."""
    updates = []
    conflicts = []
    
    if 'updated_fields' in tool_data:
        profile = user_memo.setdefault('profile', {})
        
        for field, new_value in tool_data['updated_fields'].items():
            old_value = profile.get(field)
            
            # Detect conflicts (existing non-null values that differ)
            if old_value is not None and old_value != new_value:
                conflicts.append({
                    'field': field,
                    'old_value': old_value,
                    'new_value': new_value,
                    'source': 'user_input',
                    'user_input_context': user_input,
                    'conflict_type': 'value_change'
                })
                
                # For now, keep existing value until user confirms
                # This implements "user confirmation priority" strategy
                continue
            
            # No conflict - proceed with update
            if old_value != new_value:
                profile[field] = new_value
                updates.append({
                    'category': 'profile',
                    'field': field,
                    'old_value': old_value,
                    'new_value': new_value,
                    'timestamp': datetime.now().isoformat(),
                    'source': 'user_db_update_tool'
                })
    
    return updates, conflicts


def _process_db_query_result_append(tool_data: dict, user_memo: dict) -> List[dict]:
    """Process database query tool results with append-only strategy."""
    updates = []
    
    # Always append to search history, never overwrite
    if 'query_type' in tool_data and 'results_count' in tool_data:
        search_history = user_memo.setdefault('search_history', [])
        
        search_entry = {
            'query_type': tool_data['query_type'],
            'results_count': tool_data['results_count'],
            'timestamp': datetime.now().isoformat(),
            'vendor_type': tool_data.get('vendor_type'),
            'region': tool_data.get('region'),
            'search_id': f"search_{len(search_history) + 1}_{int(datetime.now().timestamp())}"
        }
        
        search_history.append(search_entry)
        
        # Keep only last 100 searches to manage JSON size
        if len(search_history) > 100:
            search_history[:] = search_history[-100:]
            
        updates.append({
            'category': 'search_history',
            'action': 'append_search',
            'data': search_entry,
            'timestamp': datetime.now().isoformat()
        })
    
    return updates


def _process_web_search_result_append(tool_data: dict, user_memo: dict) -> List[dict]:
    """Process web search tool results with append-only strategy."""
    updates = []
    
    # Append external insights, check for meaningful new content
    if 'search_query' in tool_data and 'results' in tool_data:
        preferences = user_memo.setdefault('preferences', {})
        external_insights = preferences.setdefault('external_insights', [])
        
        # Check if this insight adds new information
        search_summary = tool_data.get('summary', '').strip()
        if not search_summary:
            return updates  # No new meaningful content
        
        # Check for duplicates based on search query and summary similarity
        is_duplicate = any(
            existing.get('query', '').lower() == tool_data['search_query'].lower() or
            existing.get('results_summary', '').lower() == search_summary.lower()
            for existing in external_insights[-5:]  # Check last 5 entries
        )
        
        if not is_duplicate:
            insight_entry = {
                'source': 'web_search',
                'query': tool_data['search_query'],
                'results_summary': search_summary,
                'timestamp': datetime.now().isoformat(),
                'insight_id': f"insight_{len(external_insights) + 1}_{int(datetime.now().timestamp())}"
            }
            
            external_insights.append(insight_entry)
            
            # Keep only last 30 insights to manage JSON size
            if len(external_insights) > 30:
                external_insights[:] = external_insights[-30:]
                
            updates.append({
                'category': 'preferences',
                'action': 'append_external_insight',
                'data': insight_entry,
                'timestamp': datetime.now().isoformat()
            })
    
    return updates


def _process_calculator_result_append(tool_data: dict, user_memo: dict) -> List[dict]:
    """Process calculator tool results with append-only strategy."""
    updates = []
    
    # Append budget calculations, never overwrite
    if 'calculation_type' in tool_data and 'result' in tool_data:
        calc_type = tool_data['calculation_type']
        
        if calc_type in ['budget_calculation', 'cost_estimation']:
            profile = user_memo.setdefault('profile', {})
            calculations = profile.setdefault('budget_calculations', [])
            
            calc_entry = {
                'type': calc_type,
                'result': tool_data['result'],
                'inputs': tool_data.get('inputs', {}),
                'timestamp': datetime.now().isoformat(),
                'calculation_id': f"calc_{len(calculations) + 1}_{int(datetime.now().timestamp())}"
            }
            
            calculations.append(calc_entry)
            
            # Keep only last 20 calculations to manage JSON size
            if len(calculations) > 20:
                calculations[:] = calculations[-20:]
                
            updates.append({
                'category': 'profile',
                'action': 'append_calculation',
                'data': calc_entry,
                'timestamp': datetime.now().isoformat()
            })
    
    return updates


def _handle_conflicts_with_user_priority(conflicts: List[dict], user_memo: dict, state: dict) -> dict:
    """Handle conflicts by prioritizing user confirmation."""
    
    confirmations_needed = []
    
    for conflict in conflicts:
        # Create user confirmation request
        confirmation = {
            'field': conflict['field'],
            'current_value': conflict['old_value'],
            'proposed_value': conflict['new_value'],
            'user_input_context': conflict.get('user_input_context', ''),
            'confirmation_message': f"기존 {conflict['field']} 값 '{conflict['old_value']}'을(를) '{conflict['new_value']}'(으)로 변경하시겠습니까?",
            'conflict_id': f"conflict_{conflict['field']}_{int(datetime.now().timestamp())}"
        }
        
        confirmations_needed.append(confirmation)
    
    return {
        'confirmations_needed': confirmations_needed,
        'conflicts_processed': len(conflicts)
    }


def _has_new_meaningful_content(updates_made: List[dict], user_memo: dict) -> bool:
    """Check if updates contain new meaningful content worth summarizing."""
    
    if not updates_made:
        return False
    
    # Consider content meaningful if:
    # 1. Profile information was updated
    # 2. New search patterns emerged
    # 3. External insights were added
    # 4. Calculations were performed
    
    meaningful_categories = ['profile', 'preferences', 'search_history']
    meaningful_updates = [u for u in updates_made if u.get('category') in meaningful_categories]
    
    return len(meaningful_updates) > 0


def _update_conversation_summary_preserve_existing(user_memo: dict, updates_made: List[dict], state: dict):
    """Update conversation summary while preserving existing content."""
    
    current_summary = user_memo.get('conversation_summary', '').strip()
    user_input = state.get('user_input', '')
    
    if updates_made and user_input:
        # Create concise summary of this interaction
        update_types = list(set(u.get('category', 'unknown') for u in updates_made))
        new_entry = f"[{datetime.now().strftime('%m-%d %H:%M')}] {user_input[:80]}{'...' if len(user_input) > 80 else ''} → {', '.join(update_types)} 업데이트"
        
        if current_summary:
            # Append to existing summary
            summary_lines = current_summary.split('\n')
            summary_lines.append(new_entry)
            
            # Keep only last 15 interactions to manage JSON size
            if len(summary_lines) > 15:
                summary_lines = summary_lines[-15:]
                
            user_memo['conversation_summary'] = '\n'.join(summary_lines)
        else:
            user_memo['conversation_summary'] = new_entry


def _update_version_and_metadata(user_memo: dict, updates_made: List[dict], tool_results: dict):
    """Update version and metadata within JSON structure."""
    
    metadata = user_memo.setdefault('metadata', {})
    
    metadata.update({
        'last_updated': datetime.now().isoformat(),
        'total_updates_count': metadata.get('total_updates_count', 0) + len(updates_made),
        'last_tool_execution': {
            'tools_used': list(tool_results.keys()),
            'successful_tools': [name for name, result in tool_results.items() 
                               if result.get('success', False)],
            'timestamp': datetime.now().isoformat()
        }
    })
    
    # Update version if significant changes
    if len(updates_made) > 0:
        current_version = user_memo.get('version', '1.0')
        try:
            version_parts = current_version.split('.')
            minor_version = int(version_parts[1]) + 1
            user_memo['version'] = f"{version_parts[0]}.{minor_version}"
        except (IndexError, ValueError):
            user_memo['version'] = "1.1"

In [43]:
# Cell 10: memo_update_node 테스트 (개선된 버전)

def test_memo_update_node():
    """Comprehensive test suite for enhanced memo_update_node functionality."""
    
    print("🧪 memo_update_node 개선 버전 테스트 시작\n")
    
    # Create temporary directory for testing
    with tempfile.TemporaryDirectory() as temp_dir:
        
        test_cases = [
            {
                "name": "충돌 없는 신규 정보 업데이트",
                "scenario": "no_conflict_update",
                "state": _create_test_state_no_conflict(temp_dir)
            },
            {
                "name": "기존 정보와 충돌 발생 (사용자 확인 필요)",
                "scenario": "conflict_detection", 
                "state": _create_test_state_with_conflict(temp_dir)
            },
            {
                "name": "검색 이력 Append 테스트",
                "scenario": "search_history_append",
                "state": _create_test_state_search_append(temp_dir)
            },
            {
                "name": "기존 내용 보존 테스트 (새 내용 없음)",
                "scenario": "content_preservation",
                "state": _create_test_state_preserve_content(temp_dir)
            },
            {
                "name": "JSON 내부 이력 관리 테스트",
                "scenario": "history_management",
                "state": _create_test_state_history_tracking(temp_dir)
            },
            {
                "name": "중복 방지 및 스마트 Append 테스트",
                "scenario": "smart_deduplication",
                "state": _create_test_state_deduplication(temp_dir)
            }
        ]
        
        for i, test_case in enumerate(test_cases, 1):
            print(f"{'='*60}")
            print(f"테스트 {i}: {test_case['name']}")
            print(f"시나리오: {test_case['scenario']}")
            print("-" * 40)
            
            # Execute memo_update_node
            try:
                initial_state = test_case['state'].copy()
                result_state = memo_update_node(test_case['state'])
                
                # Analyze results based on scenario
                _analyze_enhanced_test_results(test_case, initial_state, result_state)
                
            except Exception as e:
                print(f"❌ 테스트 실행 중 오류: {str(e)}")
                
            print("-" * 60)
            print()
    
    print("🧪 memo_update_node 개선 버전 테스트 완료")


def _create_test_state_no_conflict(temp_dir: str) -> dict:
    """Create test state for no-conflict update scenario."""
    
    user_id = "test_user_001"
    memo_file = os.path.join(temp_dir, f"user_{user_id}_memo.json")
    
    # Create initial user memo with empty fields
    initial_memo = {
        "version": "1.0",
        "profile": {
            "user_id": user_id,
            "name": "김철수",
            "wedding_date": None,  # Empty field - no conflict expected
            "total_budget_manwon": None,  # Empty field - no conflict expected
            "guest_count": None
        },
        "preferences": {},
        "search_history": [],
        "conversation_summary": "",
        "update_history": [],
        "metadata": {
            "created_at": datetime.now().isoformat(),
            "total_updates_count": 0
        }
    }
    
    with open(memo_file, 'w', encoding='utf-8') as f:
        json.dump(initial_memo, f, ensure_ascii=False, indent=2)
    
    return {
        "user_id": user_id,
        "memo_file_path": memo_file,
        "user_memo": initial_memo,
        "user_input": "결혼식 날짜를 2025년 10월 15일로 설정하고 예산은 5000만원으로 해주세요",
        "tool_results": {
            "user_db_update_tool": {
                "success": True,
                "data": {
                    "updated_fields": {
                        "wedding_date": "2025-10-15",
                        "total_budget_manwon": 5000
                    },
                    "operation": "profile_update"
                }
            }
        }
    }


def _create_test_state_with_conflict(temp_dir: str) -> dict:
    """Create test state for conflict detection scenario."""
    
    user_id = "test_user_002"
    memo_file = os.path.join(temp_dir, f"user_{user_id}_memo.json")
    
    # Create initial memo with existing conflicting data
    initial_memo = {
        "version": "1.2",
        "profile": {
            "user_id": user_id,
            "name": "이영희",
            "wedding_date": "2025-09-20",  # Existing date - will conflict
            "total_budget_manwon": 3000,   # Existing budget - will conflict
            "guest_count": 100
        },
        "update_history": [
            {
                "timestamp": "2024-01-01T10:00:00",
                "updates_made": [{"field": "wedding_date", "old_value": None, "new_value": "2025-09-20"}]
            }
        ],
        "metadata": {"total_updates_count": 5}
    }
    
    with open(memo_file, 'w', encoding='utf-8') as f:
        json.dump(initial_memo, f, ensure_ascii=False, indent=2)
    
    return {
        "user_id": user_id,
        "memo_file_path": memo_file,
        "user_memo": initial_memo,
        "user_input": "결혼식을 11월 1일로 변경하고 예산을 4500만원으로 늘려주세요",
        "tool_results": {
            "user_db_update_tool": {
                "success": True,
                "data": {
                    "updated_fields": {
                        "wedding_date": "2025-11-01",  # Conflicts with existing
                        "total_budget_manwon": 4500    # Conflicts with existing
                    }
                }
            }
        }
    }


def _create_test_state_search_append(temp_dir: str) -> dict:
    """Create test state for search history append scenario."""
    
    user_id = "test_user_003"
    memo_file = os.path.join(temp_dir, f"user_{user_id}_memo.json")
    
    initial_memo = {
        "version": "1.0",
        "profile": {"user_id": user_id},
        "search_history": [
            {
                "query_type": "venue_search",
                "vendor_type": "venue",
                "region": "강남구",
                "timestamp": "2024-01-01T10:00:00",
                "search_id": "search_1_1704067200"
            }
        ],
        "update_history": [],
        "metadata": {"total_updates_count": 1}
    }
    
    with open(memo_file, 'w', encoding='utf-8') as f:
        json.dump(initial_memo, f, ensure_ascii=False, indent=2)
    
    return {
        "user_id": user_id,
        "memo_file_path": memo_file,
        "user_memo": initial_memo,
        "user_input": "홍대 스튜디오 추천해줘",
        "tool_results": {
            "db_query_tool": {
                "success": True,
                "data": {
                    "query_type": "vendor_search",
                    "vendor_type": "studio",
                    "region": "홍대",
                    "results_count": 12
                }
            }
        }
    }


def _create_test_state_preserve_content(temp_dir: str) -> dict:
    """Create test state for content preservation scenario."""
    
    user_id = "test_user_004"
    memo_file = os.path.join(temp_dir, f"user_{user_id}_memo.json")
    
    initial_memo = {
        "version": "1.3",
        "profile": {"user_id": user_id},
        "conversation_summary": "기존 대화 내용이 여기에 있습니다.",
        "preferences": {
            "external_insights": [
                {
                    "source": "web_search",
                    "query": "웨딩 트렌드",
                    "results_summary": "기존 인사이트 내용"
                }
            ]
        },
        "update_history": [],
        "metadata": {"total_updates_count": 3}
    }
    
    with open(memo_file, 'w', encoding='utf-8') as f:
        json.dump(initial_memo, f, ensure_ascii=False, indent=2)
    
    return {
        "user_id": user_id,
        "memo_file_path": memo_file,
        "user_memo": initial_memo,
        "user_input": "안녕하세요",  # No meaningful content
        "tool_results": {
            "web_search_tool": {
                "success": True,
                "data": {
                    "search_query": "일반 인사",
                    "results": [],
                    "summary": ""  # Empty summary - should preserve existing content
                }
            }
        }
    }


def _create_test_state_history_tracking(temp_dir: str) -> dict:
    """Create test state for history management scenario."""
    
    user_id = "test_user_005"
    memo_file = os.path.join(temp_dir, f"user_{user_id}_memo.json")
    
    # Create memo with existing history
    initial_memo = {
        "version": "1.5",
        "profile": {"user_id": user_id, "guest_count": 80},
        "update_history": [
            {
                "timestamp": "2024-01-01T10:00:00",
                "version": "1.4",
                "updates_made": [{"field": "guest_count", "old_value": None, "new_value": 80}]
            }
        ],
        "metadata": {"total_updates_count": 1}
    }
    
    with open(memo_file, 'w', encoding='utf-8') as f:
        json.dump(initial_memo, f, ensure_ascii=False, indent=2)
    
    return {
        "user_id": user_id,
        "memo_file_path": memo_file,
        "user_memo": initial_memo,
        "user_input": "하객 수 기준으로 예상 비용 계산해주세요",
        "tool_results": {
            "calculator_tool": {
                "success": True,
                "data": {
                    "calculation_type": "budget_calculation",
                    "result": 3600,
                    "inputs": {"guest_count": 80, "cost_per_guest": 45}
                }
            }
        }
    }


def _create_test_state_deduplication(temp_dir: str) -> dict:
    """Create test state for smart deduplication scenario."""
    
    user_id = "test_user_006"
    memo_file = os.path.join(temp_dir, f"user_{user_id}_memo.json")
    
    initial_memo = {
        "version": "1.1",
        "profile": {"user_id": user_id},
        "preferences": {
            "external_insights": [
                {
                    "source": "web_search",
                    "query": "웨딩 트렌드",
                    "results_summary": "미니멀 스타일이 인기",
                    "timestamp": "2024-01-01T10:00:00"
                }
            ]
        },
        "update_history": [],
        "metadata": {"total_updates_count": 1}
    }
    
    with open(memo_file, 'w', encoding='utf-8') as f:
        json.dump(initial_memo, f, ensure_ascii=False, indent=2)
    
    return {
        "user_id": user_id,
        "memo_file_path": memo_file,
        "user_memo": initial_memo,
        "user_input": "웨딩 트렌드 다시 검색해주세요",
        "tool_results": {
            "web_search_tool": {
                "success": True,
                "data": {
                    "search_query": "웨딩 트렌드",  # Same query - should detect duplicate
                    "results": [{"title": "웨딩 트렌드 2025"}],
                    "summary": "미니멀 스타일이 인기"  # Same summary - should detect duplicate
                }
            }
        }
    }


def _analyze_enhanced_test_results(test_case: dict, initial_state: dict, result_state: dict):
    """Analyze and display enhanced test results."""
    
    status = result_state.get('status', 'unknown')
    scenario = test_case['scenario']
    
    print(f"📊 결과 분석:")
    print(f"   상태: {status}")
    
    if status == "ok":
        print("   ✅ 성공적으로 실행됨")
        _validate_scenario_specific_results(scenario, initial_state, result_state)
        
    elif status == "user_confirmation_required":
        print("   🔄 사용자 확인 필요")
        _validate_conflict_handling(initial_state, result_state)
        
    else:
        print("   ❌ 실행 실패")
        reason = result_state.get('reason', 'Unknown error')
        print(f"   🚨 실패 이유: {reason}")
    
    # Check JSON structure integrity
    _validate_json_structure(result_state)


def _validate_scenario_specific_results(scenario: str, initial_state: dict, result_state: dict):
    """Validate results based on specific scenario."""
    
    updated_memo = result_state.get('user_memo', {})
    updates_made = result_state.get('memo_updates_made', [])
    update_summary = result_state.get('update_summary', {})
    
    print(f"   📝 업데이트 수: {len(updates_made)}")
    print(f"   📈 요약: {update_summary}")
    
    if scenario == "no_conflict_update":
        _validate_no_conflict_update(initial_state, result_state)
        
    elif scenario == "search_history_append":
        _validate_search_append(initial_state, result_state)
        
    elif scenario == "content_preservation":
        _validate_content_preservation(initial_state, result_state)
        
    elif scenario == "history_management":
        _validate_history_tracking(initial_state, result_state)
        
    elif scenario == "smart_deduplication":
        _validate_deduplication(initial_state, result_state)


def _validate_conflict_handling(initial_state: dict, result_state: dict):
    """Validate conflict detection and handling."""
    
    conflicts = result_state.get('conflicts_detected', [])
    confirmations = result_state.get('confirmations_needed', [])
    
    print(f"   🔍 감지된 충돌: {len(conflicts)}개")
    print(f"   ❓ 필요한 확인: {len(confirmations)}개")
    
    for conf in confirmations:
        print(f"     - {conf['field']}: '{conf['current_value']}' → '{conf['proposed_value']}'")
    
    # Check that existing values were preserved
    initial_memo = initial_state.get('user_memo', {})
    final_memo = result_state.get('user_memo', {})
    
    for conflict in conflicts:
        field = conflict['field']
        initial_value = initial_memo.get('profile', {}).get(field)
        final_value = final_memo.get('profile', {}).get(field)
        
        if initial_value == final_value:
            print(f"     ✅ {field} 기존 값 보존됨")
        else:
            print(f"     ❌ {field} 값이 변경됨 (예상되지 않음)")


def _validate_no_conflict_update(initial_state: dict, result_state: dict):
    """Validate no-conflict update scenario."""
    
    initial_profile = initial_state.get('user_memo', {}).get('profile', {})
    updated_profile = result_state.get('user_memo', {}).get('profile', {})
    
    # Check updates
    if updated_profile.get('wedding_date') == "2025-10-15":
        print("     ✅ 결혼일 업데이트 확인")
    else:
        print("     ❌ 결혼일 업데이트 실패")
    
    if updated_profile.get('total_budget_manwon') == 5000:
        print("     ✅ 예산 업데이트 확인")
    else:
        print("     ❌ 예산 업데이트 실패")


def _validate_search_append(initial_state: dict, result_state: dict):
    """Validate search history append functionality."""
    
    initial_history = initial_state.get('user_memo', {}).get('search_history', [])
    updated_history = result_state.get('user_memo', {}).get('search_history', [])
    
    if len(updated_history) == len(initial_history) + 1:
        print("     ✅ 검색 이력 추가 확인")
        
        latest_search = updated_history[-1]
        if latest_search.get('vendor_type') == 'studio':
            print("     ✅ 새 검색 정보 정확함")
        else:
            print("     ❌ 새 검색 정보 부정확")
    else:
        print("     ❌ 검색 이력 추가 실패")


def _validate_content_preservation(initial_state: dict, result_state: dict):
    """Validate content preservation when no new meaningful content."""
    
    initial_summary = initial_state.get('user_memo', {}).get('conversation_summary', '')
    updated_summary = result_state.get('user_memo', {}).get('conversation_summary', '')
    
    initial_insights = initial_state.get('user_memo', {}).get('preferences', {}).get('external_insights', [])
    updated_insights = result_state.get('user_memo', {}).get('preferences', {}).get('external_insights', [])
    
    # Should preserve existing content
    if initial_summary == updated_summary:
        print("     ✅ 기존 대화 요약 보존됨")
    else:
        print("     ❌ 대화 요약이 불필요하게 변경됨")
    
    if len(updated_insights) == len(initial_insights):
        print("     ✅ 중복 인사이트 추가 방지됨")
    else:
        print("     ❌ 불필요한 인사이트 추가됨")


def _validate_history_tracking(initial_state: dict, result_state: dict):
    """Validate update history tracking in JSON."""
    
    initial_history = initial_state.get('user_memo', {}).get('update_history', [])
    updated_history = result_state.get('user_memo', {}).get('update_history', [])
    
    if len(updated_history) > len(initial_history):
        print("     ✅ 업데이트 이력 JSON에 추가됨")
        
        latest_entry = updated_history[-1]
        if 'timestamp' in latest_entry and 'updates_made' in latest_entry:
            print("     ✅ 이력 엔트리 구조 올바름")
        else:
            print("     ❌ 이력 엔트리 구조 부정확")
    else:
        print("     ❌ 업데이트 이력 추가 실패")


def _validate_deduplication(initial_state: dict, result_state: dict):
    """Validate smart deduplication functionality."""
    
    initial_insights = initial_state.get('user_memo', {}).get('preferences', {}).get('external_insights', [])
    updated_insights = result_state.get('user_memo', {}).get('preferences', {}).get('external_insights', [])
    
    if len(updated_insights) == len(initial_insights):
        print("     ✅ 중복 인사이트 추가 방지됨")
    else:
        print("     ❌ 중복 검출 실패")


def _validate_json_structure(result_state: dict):
    """Validate JSON structure integrity."""
    
    updated_memo = result_state.get('user_memo', {})
    
    # Check required structure
    required_fields = ['version', 'update_history', 'metadata']
    missing_fields = [field for field in required_fields if field not in updated_memo]
    
    if not missing_fields:
        print(f"   🏗️  JSON 구조 무결성: ✅")
    else:
        print(f"   🏗️  JSON 구조 무결성: ❌ (누락: {missing_fields})")
    
    # Check history structure
    history = updated_memo.get('update_history', [])
    if history and isinstance(history, list):
        print(f"   📚 업데이트 이력: {len(history)}개 엔트리")
    
    # Check version increment
    version = updated_memo.get('version', '1.0')
    print(f"   🔢 현재 버전: {version}")


# 테스트 실행 함수
if __name__ == "__main__":
    test_memo_update_node()

🧪 memo_update_node 개선 버전 테스트 시작

테스트 1: 충돌 없는 신규 정보 업데이트
시나리오: no_conflict_update
----------------------------------------
💾 Memory updated successfully: 2 changes
✅ Memory update completed: {'total_updates': 2, 'profile_updates': 2, 'preference_updates': 0, 'search_history_updates': 0, 'conflicts_resolved': 0, 'history_entries_total': 1}
📊 결과 분석:
   상태: ok
   ✅ 성공적으로 실행됨
   📝 업데이트 수: 2
   📈 요약: {'total_updates': 2, 'profile_updates': 2, 'preference_updates': 0, 'search_history_updates': 0, 'conflicts_resolved': 0, 'history_entries_total': 1}
     ✅ 결혼일 업데이트 확인
     ✅ 예산 업데이트 확인
   🏗️  JSON 구조 무결성: ✅
   📚 업데이트 이력: 1개 엔트리
   🔢 현재 버전: 1.1
------------------------------------------------------------

테스트 2: 기존 정보와 충돌 발생 (사용자 확인 필요)
시나리오: conflict_detection
----------------------------------------
📊 결과 분석:
   상태: user_confirmation_required
   🔄 사용자 확인 필요
   🔍 감지된 충돌: 2개
   ❓ 필요한 확인: 2개
     - wedding_date: '2025-09-20' → '2025-11-01'
     - total_budget_manwon: '3000' → '4500'
     ✅ weddi

In [46]:
# Cell 11: general_response_node 구현

def general_response_node(state: dict) -> dict:
    """
    General response node that handles non-specific queries with contextual wedding topic guidance.
    
    This node provides comprehensive responses to general questions while maintaining user engagement
    through natural conversation flow. The node serves as the fallback handler for queries that
    don't require specific tool execution or vendor recommendations.
    
    Core Functionality:
    - Processes general questions with thorough, helpful responses
    - Maintains natural conversation flow without forced topic redirection
    - Subtly guides conversation toward wedding planning topics when appropriate
    - Leverages user memory context to personalize responses
    - Provides FAQ-style answers for common wedding planning questions
    - Handles casual conversation and relationship-building interactions
    
    Response Strategy:
    - Primary Focus: Answer the user's actual question comprehensively
    - Secondary Goal: Natural topic bridging to wedding planning when contextually appropriate
    - Personalization: Incorporate user profile information when relevant
    - Tone Management: Maintain helpful, friendly, and professional tone
    - Engagement: End with gentle conversation steering toward wedding topics
    
    The node avoids forced topic changes but creates natural opportunities for wedding-related
    follow-up questions through contextual bridges and relevant suggestions.
    
    Args:
        state (dict): State containing user_input, user_memo, and conversation context
        
    Returns:
        dict: Updated state with response_content and conversation guidance
    """
    
    user_input = state.get('user_input', '').strip()
    user_memo = state.get('user_memo', {})
    user_id = state.get('user_id')
    
    try:
        if not user_input:
            state['status'] = "error"
            state['reason'] = "No user input provided for general response"
            return state
        
        # Analyze input to determine response approach
        response_context = _analyze_input_context(user_input, user_memo)
        
        # Generate core response based on question type
        core_response = _generate_core_response(user_input, response_context, user_memo)
        
        # Add natural wedding topic bridge if appropriate
        final_response = _add_wedding_topic_bridge(core_response, response_context, user_memo)
        
        # Update state with response
        state['response_content'] = final_response
        state['response_metadata'] = {
            'response_type': 'general_response',
            'topic_bridge_added': response_context.get('bridge_appropriate', False),
            'personalization_level': response_context.get('personalization_level', 'basic'),
            'generated_at': datetime.now().isoformat()
        }
        
        state['status'] = "ok"
        
        print(f"💬 일반 응답 생성 완료 (길이: {len(final_response)}자)")
        
        return state
        
    except Exception as e:
        state['status'] = "error"
        state['reason'] = f"General response generation failed: {str(e)}"
        print(f"❌ 일반 응답 생성 오류: {str(e)}")
        return state


def _analyze_input_context(user_input: str, user_memo: dict) -> dict:
    """Analyze user input to determine appropriate response strategy."""
    
    input_lower = user_input.lower()
    profile = user_memo.get('profile', {})
    
    context = {
        'question_type': 'general',
        'topic_category': 'other',
        'personalization_level': 'basic',
        'bridge_appropriate': True,
        'wedding_related': False
    }
    
    # Detect question types
    if any(word in input_lower for word in ['안녕', '하이', '좋은', '날씨', '기분']):
        context['question_type'] = 'greeting'
        context['topic_category'] = 'casual'
    
    elif any(word in input_lower for word in ['뭐', '무엇', '어떻게', '왜', '언제', '어디서']):
        context['question_type'] = 'inquiry'
        context['topic_category'] = 'informational'
    
    elif any(word in input_lower for word in ['결혼', '웨딩', '신혼', '결혼식', '예식', '신부', '신랑']):
        context['wedding_related'] = True
        context['question_type'] = 'wedding_general'
        context['topic_category'] = 'wedding'
        context['bridge_appropriate'] = False  # Already wedding-related
    
    elif any(word in input_lower for word in ['감사', '고마워', '도움', '좋아']):
        context['question_type'] = 'appreciation'
        context['topic_category'] = 'positive'
    
    elif any(word in input_lower for word in ['힘들', '어려워', '고민', '걱정']):
        context['question_type'] = 'concern'
        context['topic_category'] = 'supportive'
    
    # Determine personalization level based on available user info
    if profile.get('name') or profile.get('wedding_date'):
        context['personalization_level'] = 'high'
    elif profile.get('user_id'):
        context['personalization_level'] = 'medium'
    
    # Adjust bridge appropriateness based on context
    if context['question_type'] in ['appreciation', 'concern']:
        context['bridge_appropriate'] = True
    elif context['topic_category'] == 'casual':
        context['bridge_appropriate'] = True
    
    return context


def _generate_core_response(user_input: str, context: dict, user_memo: dict) -> str:
    """Generate core response based on question type and context."""
    
    question_type = context.get('question_type', 'general')
    profile = user_memo.get('profile', {})
    user_name = profile.get('name', '')
    
    # Response templates based on question type
    if question_type == 'greeting':
        responses = [
            f"안녕하세요{f' {user_name}님' if user_name else ''}! 오늘 하루는 어떻게 보내고 계신가요?",
            f"반가워요{f' {user_name}님' if user_name else ''}! 무엇을 도와드릴까요?",
            "좋은 하루네요! 오늘은 어떤 일이 있으셨나요?"
        ]
        return _select_appropriate_response(responses, context)
    
    elif question_type == 'wedding_general':
        return _handle_wedding_general_question(user_input, user_memo)
    
    elif question_type == 'inquiry':
        return _handle_general_inquiry(user_input, context, user_memo)
    
    elif question_type == 'appreciation':
        responses = [
            "도움이 되었다니 정말 기뻐요! 언제든 궁금한 것이 있으시면 말씀해 주세요.",
            "감사하다고 말씀해 주셔서 감동이에요. 더 도움이 필요하시면 언제든지 연락해 주세요!",
            "천만에요! 여러분의 만족스러운 반응이 저에게는 최고의 보상입니다."
        ]
        return _select_appropriate_response(responses, context)
    
    elif question_type == 'concern':
        responses = [
            "걱정이 많으시군요. 천천히 하나씩 해결해 나가면 분명히 좋은 결과가 있을 거예요.",
            "어려운 상황이시네요. 하지만 모든 문제에는 해결책이 있다고 생각해요. 함께 찾아볼까요?",
            "힘든 시간을 보내고 계신 것 같아요. 제가 도울 수 있는 부분이 있다면 언제든 말씀해 주세요."
        ]
        return _select_appropriate_response(responses, context)
    
    else:
        # General fallback response
        return _generate_general_fallback_response(user_input, user_memo)


def _handle_wedding_general_question(user_input: str, user_memo: dict) -> str:
    """Handle general wedding-related questions."""
    
    profile = user_memo.get('profile', {})
    input_lower = user_input.lower()
    
    if '준비' in input_lower and ('힘들' in input_lower or '어려워' in input_lower):
        return """결혼 준비가 힘드시죠? 정말 많은 분들이 같은 고민을 하세요. 
결혼식 준비는 단계적으로 접근하는 것이 중요해요. 
먼저 예산과 날짜, 하객 규모를 정하고, 그 다음에 예식장과 스튜디오를 선택하시는 것을 추천드려요."""
    
    elif '언제' in input_lower and '시작' in input_lower:
        wedding_date = profile.get('wedding_date')
        if wedding_date:
            return f"""결혼식이 {wedding_date}로 예정되어 있으시니, 지금부터 차근차근 준비하시면 충분해요.
보통 결혼식 3-6개월 전부터 본격적으로 준비하시는 분들이 많아요.
예식장 예약은 빠를수록 좋고, 드레스나 턱시도는 2-3개월 전에 준비하시면 됩니다."""
        else:
            return """결혼 준비는 보통 결혼식 3-6개월 전부터 시작하시는 것을 추천해요.
먼저 예식 날짜부터 정하시는 것이 좋겠네요."""
    
    else:
        return """결혼 준비에 대해 궁금한 점이 있으시군요! 
결혼식 준비는 생각보다 많은 것들을 고려해야 하지만, 체계적으로 접근하면 충분히 해낼 수 있어요.
예산, 날짜, 예식장, 스튜디오 등 어떤 부분이 가장 궁금하신가요?"""


def _handle_general_inquiry(user_input: str, context: dict, user_memo: dict) -> str:
    """Handle general inquiry questions."""
    
    input_lower = user_input.lower()
    
    # Common general questions with helpful responses
    if any(word in input_lower for word in ['날씨', '기온', '온도']):
        return """오늘 날씨 정보는 날씨 앱이나 포털 사이트에서 정확히 확인하실 수 있어요.
날씨가 좋은 날이면 야외 웨딩이나 스튜디오 촬영하기에도 좋겠네요!"""
    
    elif any(word in input_lower for word in ['시간', '몇시', '언제']):
        return """정확한 시간 정보가 필요하시군요. 
결혼식 시간 계획을 세우실 때는 보통 낮 12시나 2시, 4시에 예식을 많이 하세요."""
    
    elif any(word in input_lower for word in ['음식', '요리', '맛집']):
        return """맛있는 음식에 관심이 많으시네요! 
결혼식 피로연이나 신혼여행 맛집 투어도 미리 계획해 보시면 어떨까요?"""
    
    elif any(word in input_lower for word in ['돈', '비용', '가격']):
        return """비용에 대해 관심이 있으시군요. 
결혼 준비도 예산 계획을 미리 세워두시면 훨씬 수월하게 진행하실 수 있어요."""
    
    else:
        return f"""'{user_input}'에 대해 구체적인 정보를 제공해 드리기는 어렵지만, 
관련된 정보를 찾아서 도움을 드리고 싶어요."""


def _generate_general_fallback_response(user_input: str, user_memo: dict) -> str:
    """Generate fallback response for general questions."""
    
    profile = user_memo.get('profile', {})
    user_name = profile.get('name', '')
    
    responses = [
        f"흥미로운 질문이네요{f' {user_name}님' if user_name else ''}! 더 구체적으로 설명해 주시면 더 도움이 될 것 같아요.",
        f"좋은 점을 말씀해 주셨네요. 조금 더 자세히 알려주시면 더 정확한 답변을 드릴 수 있을 것 같아요.",
        f"그런 관점에서 생각해 보시는군요! 어떤 부분이 가장 궁금하신지 알려주시면 좋겠어요."
    ]
    
    return responses[hash(user_input) % len(responses)]


def _add_wedding_topic_bridge(core_response: str, context: dict, user_memo: dict) -> str:
    """Add natural wedding topic bridge to the response if appropriate."""
    
    if not context.get('bridge_appropriate', False):
        return core_response
    
    if context.get('wedding_related', False):
        return core_response  # Already wedding-related, no bridge needed
    
    profile = user_memo.get('profile', {})
    wedding_date = profile.get('wedding_date')
    
    # Generate contextual bridges based on user profile
    if wedding_date:
        bridge_options = [
            f" 그런데 {wedding_date} 결혼식 준비는 어떻게 진행되고 있나요?",
            f" 참, 결혼식 준비 중이신데 도움이 필요한 부분은 없으신가요?",
            f" 결혼 준비로 바쁘실 텐데, 다른 궁금한 점은 없으신지요?"
        ]
    elif profile.get('total_budget_manwon'):
        budget = profile.get('total_budget_manwon')
        bridge_options = [
            f" 결혼 준비 예산 {budget}만원으로 계획하고 계시는데, 어떤 부분부터 시작해볼까요?",
            " 결혼 준비는 어떻게 진행되고 있나요?",
            " 혹시 결혼 준비 관련해서 궁금한 점이 있으시면 언제든 물어보세요!"
        ]
    else:
        bridge_options = [
            " 혹시 결혼 준비 계획이 있으시다면 언제든 도움을 요청해 주세요!",
            " 결혼이나 웨딩과 관련된 궁금한 점이 있으시면 편하게 말씀해 주세요.",
            " 결혼 준비에 대한 조언이 필요하시면 언제든지 연락해 주세요!"
        ]
    
    # Select appropriate bridge based on context
    question_type = context.get('question_type', 'general')
    if question_type == 'concern':
        # More supportive bridge for concerns
        bridge = " 결혼 준비로 고민이 있으시다면 함께 해결책을 찾아보아요!"
    elif question_type == 'appreciation':
        # Encouraging bridge for positive interactions
        bridge = " 결혼 준비도 이렇게 긍정적인 마음으로 하시면 분명 멋진 결과가 있을 거예요!"
    else:
        bridge = bridge_options[hash(core_response) % len(bridge_options)]
    
    return core_response + bridge


def _select_appropriate_response(responses: List[str], context: dict) -> str:
    """Select most appropriate response based on context."""
    
    personalization = context.get('personalization_level', 'basic')
    
    if personalization == 'high':
        return responses[0]  # Most personalized
    elif personalization == 'medium':
        return responses[1] if len(responses) > 1 else responses[0]
    else:
        return responses[-1]  # Most generic

In [48]:
# Cell 12: general_response_node 테스트

def test_general_response_node():
    """Comprehensive test suite for general_response_node functionality."""
    
    print("🧪 general_response_node 테스트 시작\n")
    
    test_cases = [
        {
            "name": "인사/일반 대화 테스트",
            "scenario": "greeting",
            "state": _create_test_state_greeting()
        },
        {
            "name": "일반적인 질문 테스트",
            "scenario": "general_inquiry", 
            "state": _create_test_state_general_inquiry()
        },
        {
            "name": "결혼 관련 일반 질문 테스트",
            "scenario": "wedding_general",
            "state": _create_test_state_wedding_general()
        },
        {
            "name": "감사/긍정적 반응 테스트",
            "scenario": "appreciation",
            "state": _create_test_state_appreciation()
        },
        {
            "name": "고민/걱정 표현 테스트",
            "scenario": "concern",
            "state": _create_test_state_concern()
        },
        {
            "name": "개인화 레벨 테스트 (프로필 있음)",
            "scenario": "personalized",
            "state": _create_test_state_personalized()
        },
        {
            "name": "개인화 레벨 테스트 (프로필 없음)",
            "scenario": "non_personalized",
            "state": _create_test_state_non_personalized()
        },
        {
            "name": "결혼 토픽 브릿지 테스트",
            "scenario": "topic_bridge",
            "state": _create_test_state_topic_bridge()
        }
    ]
    
    for i, test_case in enumerate(test_cases, 1):
        print(f"{'='*60}")
        print(f"테스트 {i}: {test_case['name']}")
        print(f"시나리오: {test_case['scenario']}")
        print(f"입력: '{test_case['state']['user_input']}'")
        print("-" * 40)
        
        # Execute general_response_node
        try:
            initial_state = test_case['state'].copy()
            result_state = general_response_node(test_case['state'])
            
            # Analyze results
            _analyze_general_response_results(test_case, initial_state, result_state)
            
        except Exception as e:
            print(f"❌ 테스트 실행 중 오류: {str(e)}")
            
        print("-" * 60)
        print()
    
    print("🧪 general_response_node 테스트 완료")


def _create_test_state_greeting() -> dict:
    """Create test state for greeting scenario."""
    
    return {
        "user_id": "test_user_001",
        "user_input": "안녕하세요! 좋은 아침이에요",
        "user_memo": {
            "profile": {
                "user_id": "test_user_001",
                "name": "김철수"
            }
        }
    }


def _create_test_state_general_inquiry() -> dict:
    """Create test state for general inquiry scenario."""
    
    return {
        "user_id": "test_user_002",
        "user_input": "오늘 날씨가 어떤가요?",
        "user_memo": {
            "profile": {
                "user_id": "test_user_002"
            }
        }
    }


def _create_test_state_wedding_general() -> dict:
    """Create test state for wedding general question scenario."""
    
    return {
        "user_id": "test_user_003",
        "user_input": "결혼 준비가 너무 힘들어요. 언제부터 시작해야 하나요?",
        "user_memo": {
            "profile": {
                "user_id": "test_user_003",
                "name": "이영희",
                "wedding_date": "2025-10-15"
            }
        }
    }


def _create_test_state_appreciation() -> dict:
    """Create test state for appreciation scenario."""
    
    return {
        "user_id": "test_user_004", 
        "user_input": "정말 감사해요! 많은 도움이 되었어요",
        "user_memo": {
            "profile": {
                "user_id": "test_user_004",
                "total_budget_manwon": 4000
            }
        }
    }


def _create_test_state_concern() -> dict:
    """Create test state for concern scenario."""
    
    return {
        "user_id": "test_user_005",
        "user_input": "요즘 고민이 너무 많아서 힘들어요",
        "user_memo": {
            "profile": {
                "user_id": "test_user_005",
                "name": "박민수"
            }
        }
    }


def _create_test_state_personalized() -> dict:
    """Create test state for personalized response scenario."""
    
    return {
        "user_id": "test_user_006",
        "user_input": "오늘 하루 어떻게 보내셨나요?",
        "user_memo": {
            "profile": {
                "user_id": "test_user_006",
                "name": "최지영",
                "wedding_date": "2025-12-01",
                "total_budget_manwon": 5000
            }
        }
    }


def _create_test_state_non_personalized() -> dict:
    """Create test state for non-personalized response scenario."""
    
    return {
        "user_id": "test_user_007",
        "user_input": "안녕하세요",
        "user_memo": {
            "profile": {
                "user_id": "test_user_007"
                # No name or other personal info
            }
        }
    }


def _create_test_state_topic_bridge() -> dict:
    """Create test state for topic bridge scenario."""
    
    return {
        "user_id": "test_user_008",
        "user_input": "맛있는 음식 추천해주세요",
        "user_memo": {
            "profile": {
                "user_id": "test_user_008",
                "name": "정현우",
                "wedding_date": "2025-11-15"
            }
        }
    }


def _analyze_general_response_results(test_case: dict, initial_state: dict, result_state: dict):
    """Analyze and display general response test results."""
    
    status = result_state.get('status', 'unknown')
    scenario = test_case['scenario']
    
    print(f"📊 결과 분석:")
    print(f"   상태: {status}")
    
    if status == "ok":
        print("   ✅ 성공적으로 실행됨")
        
        response_content = result_state.get('response_content', '')
        response_metadata = result_state.get('response_metadata', {})
        
        print(f"   📝 응답 길이: {len(response_content)}자")
        print(f"   🎯 응답 타입: {response_metadata.get('response_type', 'unknown')}")
        
        # Display first part of response for review
        preview = response_content[:100] + "..." if len(response_content) > 100 else response_content
        print(f"   💬 응답 미리보기: {preview}")
        
        # Analyze scenario-specific results
        _validate_scenario_response(scenario, initial_state, result_state)
        
        # Check topic bridge
        bridge_added = response_metadata.get('topic_bridge_added', False)
        personalization_level = response_metadata.get('personalization_level', 'basic')
        
        print(f"   🌉 토픽 브릿지 추가: {'예' if bridge_added else '아니오'}")
        print(f"   👤 개인화 레벨: {personalization_level}")
        
        # Check wedding topic guidance
        _check_wedding_guidance(response_content, scenario, initial_state)
        
    else:
        print("   ❌ 실행 실패")
        reason = result_state.get('reason', 'Unknown error')
        print(f"   🚨 실패 이유: {reason}")


def _validate_scenario_response(scenario: str, initial_state: dict, result_state: dict):
    """Validate response based on specific scenario."""
    
    response_content = result_state.get('response_content', '')
    user_input = initial_state.get('user_input', '')
    user_memo = initial_state.get('user_memo', {})
    
    if scenario == "greeting":
        if any(word in response_content for word in ['안녕', '반가', '하루']):
            print("     ✅ 인사 응답 적절함")
        else:
            print("     ❌ 인사 응답 부적절함")
    
    elif scenario == "general_inquiry":
        if '날씨' in user_input and '날씨' in response_content:
            print("     ✅ 질문 내용 반영됨")
        else:
            print("     ❌ 질문 내용 미반영")
    
    elif scenario == "wedding_general":
        if any(word in response_content for word in ['결혼', '준비', '예식']):
            print("     ✅ 결혼 관련 응답 적절함")
        else:
            print("     ❌ 결혼 관련 응답 부족")
    
    elif scenario == "appreciation":
        if any(word in response_content for word in ['감사', '기뻐', '도움']):
            print("     ✅ 감사 응답 적절함")
        else:
            print("     ❌ 감사 응답 부적절함")
    
    elif scenario == "concern":
        if any(word in response_content for word in ['걱정', '힘든', '해결', '도움']):
            print("     ✅ 위로/지원 응답 적절함")
        else:
            print("     ❌ 위로/지원 응답 부족")
    
    elif scenario == "personalized":
        user_name = user_memo.get('profile', {}).get('name', '')
        if user_name and user_name in response_content:
            print("     ✅ 개인화된 응답 (이름 포함)")
        else:
            print("     ❌ 개인화 부족")
    
    elif scenario == "non_personalized":
        response_metadata = result_state.get('response_metadata', {})
        personalization = response_metadata.get('personalization_level', 'basic')
        if personalization == 'basic':
            print("     ✅ 기본 응답 레벨 적절함")
        else:
            print("     ❌ 개인화 레벨 예상과 다름")


def _check_wedding_guidance(response_content: str, scenario: str, initial_state: dict):
    """Check if wedding topic guidance is appropriately included."""
    
    wedding_keywords = ['결혼', '웨딩', '예식', '신혼', '결혼식']
    has_wedding_guidance = any(keyword in response_content for keyword in wedding_keywords)
    
    # Determine if guidance is expected
    user_memo = initial_state.get('user_memo', {})
    wedding_date = user_memo.get('profile', {}).get('wedding_date')
    
    if scenario == "wedding_general":
        # Already wedding-related, guidance not necessary
        if has_wedding_guidance:
            print("     ✅ 결혼 관련 내용 포함")
        else:
            print("     ❌ 결혼 관련 내용 부족")
    
    elif scenario in ["greeting", "general_inquiry", "appreciation", "concern", "topic_bridge"]:
        # Should have some wedding guidance
        if has_wedding_guidance:
            print("     ✅ 결혼 토픽 유도 포함")
        else:
            print("     ⚠️  결혼 토픽 유도 없음")
    
    # Check quality of guidance
    if has_wedding_guidance:
        # Check if guidance feels natural (not forced)
        if any(phrase in response_content for phrase in ['혹시', '참', '그런데']):
            print("     ✅ 자연스러운 토픽 전환")
        elif response_content.count('결혼') > 2:
            print("     ⚠️  토픽 전환이 다소 억지스러움")
        else:
            print("     ✅ 적절한 토픽 가이던스")


def _display_full_responses():
    """Display full responses for manual review."""
    
    print("\n" + "="*60)
    print("📋 전체 응답 샘플 리뷰")
    print("="*60)
    
    sample_states = [
        {
            "name": "인사 + 개인화",
            "state": {
                "user_input": "안녕하세요!",
                "user_memo": {
                    "profile": {
                        "name": "김철수",
                        "wedding_date": "2025-10-15"
                    }
                }
            }
        },
        {
            "name": "일반 질문 + 브릿지",
            "state": {
                "user_input": "오늘 날씨가 어떤가요?", 
                "user_memo": {
                    "profile": {
                        "total_budget_manwon": 3000
                    }
                }
            }
        },
        {
            "name": "고민 상담 + 지원",
            "state": {
                "user_input": "요즘 스트레스가 많아요",
                "user_memo": {
                    "profile": {
                        "name": "이영희"
                    }
                }
            }
        }
    ]
    
    for sample in sample_states:
        print(f"\n🔸 {sample['name']}")
        print(f"입력: {sample['state']['user_input']}")
        
        try:
            result = general_response_node(sample['state'])
            response = result.get('response_content', '응답 생성 실패')
            print(f"응답: {response}")
        except Exception as e:
            print(f"오류: {str(e)}")
        
        print("-" * 40)


# 테스트 실행 함수들
if __name__ == "__main__":
    test_general_response_node()
    
    # Uncomment to see full response samples
    _display_full_responses()

🧪 general_response_node 테스트 시작

테스트 1: 인사/일반 대화 테스트
시나리오: greeting
입력: '안녕하세요! 좋은 아침이에요'
----------------------------------------
💬 일반 응답 생성 완료 (길이: 70자)
📊 결과 분석:
   상태: ok
   ✅ 성공적으로 실행됨
   📝 응답 길이: 70자
   🎯 응답 타입: general_response
   💬 응답 미리보기: 안녕하세요 김철수님! 오늘 하루는 어떻게 보내고 계신가요? 결혼이나 웨딩과 관련된 궁금한 점이 있으시면 편하게 말씀해 주세요.
     ✅ 인사 응답 적절함
   🌉 토픽 브릿지 추가: 예
   👤 개인화 레벨: high
     ✅ 결혼 토픽 유도 포함
     ✅ 적절한 토픽 가이던스
------------------------------------------------------------

테스트 2: 일반적인 질문 테스트
시나리오: general_inquiry
입력: '오늘 날씨가 어떤가요?'
----------------------------------------
💬 일반 응답 생성 완료 (길이: 53자)
📊 결과 분석:
   상태: ok
   ✅ 성공적으로 실행됨
   📝 응답 길이: 53자
   🎯 응답 타입: general_response
   💬 응답 미리보기: 반가워요! 무엇을 도와드릴까요? 혹시 결혼 준비 계획이 있으시다면 언제든 도움을 요청해 주세요!
     ❌ 질문 내용 미반영
   🌉 토픽 브릿지 추가: 예
   👤 개인화 레벨: medium
     ✅ 결혼 토픽 유도 포함
     ✅ 자연스러운 토픽 전환
------------------------------------------------------------

테스트 3: 결혼 관련 일반 질문 테스트
시나리오: wedding_general
입력: '결혼 준비가 너무 힘들어요. 언제부터 시작해야 하나요?'
--------------------

In [49]:
# Cell 13: response_generation_node 구현

def response_generation_node(state: dict) -> dict:
    """
    Final response synthesis node that consolidates results from all processing nodes into user-friendly output.
    
    This node serves as the comprehensive response orchestrator that transforms raw processing results
    into polished, contextually appropriate, and engaging final responses for users. It acts as the
    critical bridge between internal system processing and external user communication.
    
    Core Responsibilities:
    - Multi-source Response Integration: Synthesizes outputs from tool_execution, general_response, 
      recommendation, and memo_update nodes into coherent final responses
    - Content Quality Assurance: Applies filtering, prioritization, and quality checks to ensure 
      response accuracy and relevance
    - User Experience Optimization: Formats responses with appropriate tone, structure, and 
      personalization based on user context and preferences
    - Error Recovery and Graceful Degradation: Handles partial failures and missing data scenarios 
      while maintaining response quality
    - Context Preservation: Maintains conversation flow and references previous interactions 
      when contextually appropriate
    
    Response Processing Pipeline:
    1. Input Validation: Verifies availability and quality of source content from upstream nodes
    2. Content Prioritization: Determines primary response source based on routing decision and data quality
    3. Multi-modal Integration: Combines tool results, recommendations, and general responses appropriately
    4. Personalization Layer: Applies user-specific customizations using profile and preference data
    5. Quality Enhancement: Improves readability, adds helpful context, and ensures completeness
    6. Format Optimization: Structures response for optimal user comprehension and engagement
    7. Metadata Generation: Creates comprehensive response metadata for analytics and debugging
    
    Content Filtering Strategy:
    - Relevance Filtering: Removes or deprioritizes content not directly related to user query
    - Accuracy Validation: Cross-references tool results for consistency and reliability
    - Completeness Assessment: Identifies and addresses gaps in information coverage
    - Duplication Elimination: Prevents redundant information while preserving important details
    - Sensitivity Screening: Ensures appropriate content for wedding planning context
    
    Response Formatting Features:
    - Dynamic Structure Adaptation: Adjusts response format based on content type and user preferences
    - Progressive Information Disclosure: Presents complex information in digestible segments
    - Actionable Guidance Integration: Includes next steps and follow-up suggestions when appropriate
    - Visual Enhancement Hints: Provides formatting cues for improved readability
    - Conversation Flow Maintenance: Ensures natural dialogue progression
    
    Error Handling and Recovery:
    - Partial Response Generation: Creates meaningful responses even when some data sources fail
    - Fallback Content Strategy: Maintains user engagement during system limitations
    - Error Context Preservation: Retains error information for debugging while presenting user-friendly messages
    - Graceful Degradation: Reduces response scope rather than failing completely
    
    The node prioritizes user satisfaction and engagement while maintaining high standards for
    accuracy, relevance, and helpfulness across all response scenarios.
    
    Args:
        state (dict): Complete system state containing routing decisions, tool results, 
                     response content, user context, and processing metadata
        
    Returns:
        dict: Enhanced state with final_response, comprehensive response metadata, 
              quality metrics, and completion status
    """
    
    # Extract comprehensive state information
    routing_decision = state.get('routing_decision')
    tool_results = state.get('tool_results', {})
    response_content = state.get('response_content', '')
    user_memo = state.get('user_memo', {})
    user_input = state.get('user_input', '')
    memo_updates_made = state.get('memo_updates_made', [])
    update_summary = state.get('update_summary', {})
    user_id = state.get('user_id')
    
    # Initialize response generation tracking
    generation_metadata = {
        'generation_timestamp': datetime.now().isoformat(),
        'primary_content_source': 'unknown',
        'content_sources_used': [],
        'personalization_applied': False,
        'quality_score': 0.0,
        'response_length': 0,
        'processing_notes': []
    }
    
    try:
        # Validate essential inputs
        if not user_id:
            return _handle_critical_error(state, "No user_id available for response generation", generation_metadata)
        
        if not user_input.strip():
            return _handle_critical_error(state, "No user input available for response context", generation_metadata)
        
        # Determine primary response source and strategy
        response_strategy = _determine_response_strategy(routing_decision, response_content, tool_results, state)
        generation_metadata['primary_content_source'] = response_strategy['primary_source']
        generation_metadata['content_sources_used'] = response_strategy['sources_used']
        
        # Generate core response content based on strategy
        core_response = _generate_core_response_content(response_strategy, state)
        
        # Apply intelligent content filtering and enhancement
        filtered_content = _apply_content_filtering(core_response, response_strategy, state)
        
        # Add contextual enhancements and personalization
        enhanced_response = _apply_personalization_and_context(filtered_content, user_memo, state)
        
        # Integrate tool results and memory updates if relevant
        integrated_response = _integrate_tool_results_and_updates(
            enhanced_response, tool_results, memo_updates_made, update_summary, response_strategy
        )
        
        # Apply final formatting and quality improvements
        final_response = _apply_final_formatting(integrated_response, response_strategy, user_memo)
        
        # Generate comprehensive response metadata
        generation_metadata.update(_generate_response_metadata(final_response, response_strategy, state))
        
        # Update state with final results
        state['final_response'] = final_response
        state['response_metadata'] = generation_metadata
        state['status'] = "completed"
        state['processing_completed_at'] = datetime.now().isoformat()
        
        # Log successful completion
        response_length = len(final_response)
        primary_source = generation_metadata['primary_content_source']
        quality_score = generation_metadata['quality_score']
        
        print(f"✅ 최종 응답 생성 완료")
        print(f"   📏 응답 길이: {response_length}자")
        print(f"   📍 주요 소스: {primary_source}")
        print(f"   ⭐ 품질 점수: {quality_score:.2f}/5.0")
        print(f"   🔧 사용된 소스: {', '.join(generation_metadata['content_sources_used'])}")
        
        return state
        
    except Exception as e:
        return _handle_generation_error(state, str(e), generation_metadata)


def _determine_response_strategy(routing_decision: str, response_content: str, tool_results: dict, state: dict) -> dict:
    """Determine the optimal response generation strategy based on available content."""
    
    strategy = {
        'primary_source': 'unknown',
        'sources_used': [],
        'response_type': 'hybrid',
        'personalization_level': 'medium',
        'formatting_style': 'conversational',
        'include_tool_details': False,
        'include_next_steps': True
    }
    
    # Analyze available content sources
    has_tool_results = bool(tool_results and any(r.get('success') for r in tool_results.values()))
    has_response_content = bool(response_content and response_content.strip())
    has_memo_updates = bool(state.get('memo_updates_made'))
    
    # Determine primary source based on routing decision and content quality
    if routing_decision == "tool_execution" and has_tool_results:
        strategy['primary_source'] = 'tool_results'
        strategy['response_type'] = 'data_driven'
        strategy['include_tool_details'] = True
        strategy['sources_used'].append('tool_execution')
        
    elif routing_decision == "general_response" and has_response_content:
        strategy['primary_source'] = 'response_content'
        strategy['response_type'] = 'conversational'
        strategy['formatting_style'] = 'natural'
        strategy['sources_used'].append('general_response')
        
    elif routing_decision == "recommendation" and has_tool_results:
        strategy['primary_source'] = 'recommendations'
        strategy['response_type'] = 'recommendation_focused'
        strategy['include_next_steps'] = True
        strategy['sources_used'].append('recommendation_system')
        
    else:
        # Fallback strategy - use best available content
        if has_response_content:
            strategy['primary_source'] = 'response_content'
            strategy['sources_used'].append('general_response')
        elif has_tool_results:
            strategy['primary_source'] = 'tool_results'
            strategy['sources_used'].append('tool_execution')
        else:
            strategy['primary_source'] = 'fallback'
            strategy['response_type'] = 'minimal'
    
    # Add supplementary sources
    if has_memo_updates:
        strategy['sources_used'].append('memo_updates')
    
    # Adjust personalization based on user data
    user_memo = state.get('user_memo', {})
    profile = user_memo.get('profile', {})
    
    if profile.get('name') and profile.get('wedding_date'):
        strategy['personalization_level'] = 'high'
    elif profile.get('user_id'):
        strategy['personalization_level'] = 'medium'
    else:
        strategy['personalization_level'] = 'low'
    
    return strategy


def _generate_core_response_content(strategy: dict, state: dict) -> str:
    """Generate core response content based on the determined strategy."""
    
    primary_source = strategy['primary_source']
    
    if primary_source == 'tool_results':
        return _generate_tool_results_response(state.get('tool_results', {}), state)
        
    elif primary_source == 'response_content':
        return state.get('response_content', '')
        
    elif primary_source == 'recommendations':
        return _generate_recommendation_response(state.get('recommendations', []), state)
        
    elif primary_source == 'fallback':
        return _generate_fallback_response(state)
        
    else:
        return _generate_minimal_response(state)


def _generate_tool_results_response(tool_results: dict, state: dict) -> str:
    """Generate response based on tool execution results."""
    
    successful_results = {name: result for name, result in tool_results.items() 
                         if result.get('success', False)}
    
    if not successful_results:
        return "요청을 처리했지만 구체적인 결과를 가져올 수 없었습니다. 다시 시도해 주시거나 다른 방식으로 질문해 주세요."
    
    response_parts = []
    user_input = state.get('user_input', '')
    
    # Process different types of tool results
    for tool_name, result in successful_results.items():
        tool_data = result.get('data', {})
        
        if tool_name == 'db_query_tool':
            response_parts.append(_format_db_query_results(tool_data, user_input))
            
        elif tool_name == 'user_db_update_tool':
            response_parts.append(_format_update_results(tool_data, user_input))
            
        elif tool_name == 'web_search_tool':
            response_parts.append(_format_web_search_results(tool_data, user_input))
            
        elif tool_name == 'calculator_tool':
            response_parts.append(_format_calculator_results(tool_data, user_input))
    
    if not response_parts:
        return "처리가 완료되었습니다. 추가로 궁금한 점이 있으시면 말씀해 주세요."
    
    return '\n\n'.join(filter(None, response_parts))


def _format_db_query_results(tool_data: dict, user_input: str) -> str:
    """Format database query results into user-friendly response."""
    
    results_count = tool_data.get('results_count', 0)
    vendor_type = tool_data.get('vendor_type', '업체')
    region = tool_data.get('region', '')
    results = tool_data.get('results', [])
    
    if results_count == 0:
        return f"죄송합니다. {region} 지역의 {vendor_type} 정보를 찾을 수 없었습니다. 다른 지역이나 조건으로 검색해 보시겠어요?"
    
    response = f"{region} 지역에서 {vendor_type} {results_count}곳을 찾았습니다.\n\n"
    
    # Show top results if available
    if results:
        response += "추천 업체:\n"
        for i, venue in enumerate(results[:3], 1):
            name = venue.get('name', f'{vendor_type} {i}')
            rating = venue.get('rating', 0)
            response += f"{i}. {name}"
            if rating > 0:
                response += f" (평점: {rating}⭐)"
            response += "\n"
    
    return response + "\n더 자세한 정보가 필요하시면 말씀해 주세요!"


def _format_update_results(tool_data: dict, user_input: str) -> str:
    """Format user data update results into confirmation message."""
    
    updated_fields = tool_data.get('updated_fields', {})
    
    if not updated_fields:
        return "업데이트가 완료되었습니다."
    
    response = "다음 정보가 업데이트되었습니다:\n\n"
    
    field_names = {
        'wedding_date': '결혼식 날짜',
        'total_budget_manwon': '총 예산',
        'guest_count': '하객 수',
        'name': '이름',
        'partner_name': '파트너 이름'
    }
    
    for field, value in updated_fields.items():
        display_name = field_names.get(field, field)
        if field == 'total_budget_manwon':
            response += f"• {display_name}: {value}만원\n"
        else:
            response += f"• {display_name}: {value}\n"
    
    return response + "\n업데이트된 정보로 더 정확한 추천을 제공해 드릴 수 있습니다!"


def _format_web_search_results(tool_data: dict, user_input: str) -> str:
    """Format web search results into informative response."""
    
    search_query = tool_data.get('search_query', '검색')
    summary = tool_data.get('summary', '')
    results = tool_data.get('results', [])
    
    if not summary and not results:
        return f"'{search_query}'에 대한 최신 정보를 찾지 못했습니다. 다른 키워드로 검색해 보시겠어요?"
    
    response = f"'{search_query}'에 대한 최신 정보를 찾았습니다.\n\n"
    
    if summary:
        response += f"📝 요약: {summary}\n\n"
    
    if results:
        response += "관련 정보:\n"
        for i, item in enumerate(results[:3], 1):
            title = item.get('title', f'결과 {i}')
            response += f"{i}. {title}\n"
    
    return response


def _format_calculator_results(tool_data: dict, user_input: str) -> str:
    """Format calculator results into clear explanation."""
    
    calculation_type = tool_data.get('calculation_type', '계산')
    result = tool_data.get('result', 0)
    inputs = tool_data.get('inputs', {})
    breakdown = tool_data.get('breakdown', {})
    
    response = f"{calculation_type} 결과입니다.\n\n"
    
    # Show main result
    if calculation_type in ['budget_calculation', 'cost_estimation']:
        response += f"💰 총 예상 비용: {result:,}만원\n\n"
    else:
        response += f"📊 계산 결과: {result}\n\n"
    
    # Show breakdown if available
    if breakdown:
        response += "상세 내역:\n"
        for category, amount in breakdown.items():
            category_name = {
                'venue': '예식장',
                'catering': '식음료',
                'photography': '촬영',
                'misc': '기타'
            }.get(category, category)
            response += f"• {category_name}: {amount:,}만원\n"
    
    return response + "\n예산 계획에 도움이 되셨기를 바랍니다!"


def _generate_recommendation_response(recommendations: list, state: dict) -> str:
    """Generate response based on recommendation results."""
    
    if not recommendations:
        return "현재 추천할 수 있는 항목이 없습니다. 더 구체적인 요청을 해주시면 더 나은 추천을 제공해 드릴 수 있습니다."
    
    response = "맞춤형 추천을 준비했습니다.\n\n"
    
    for i, rec in enumerate(recommendations[:5], 1):
        name = rec.get('name', f'추천 {i}')
        rating = rec.get('rating', 0)
        reason = rec.get('reason', '')
        
        response += f"{i}. {name}"
        if rating > 0:
            response += f" (평점: {rating}⭐)"
        if reason:
            response += f"\n   {reason}"
        response += "\n\n"
    
    return response + "더 자세한 정보나 다른 추천이 필요하시면 말씀해 주세요!"


def _generate_fallback_response(state: dict) -> str:
    """Generate fallback response when no primary content is available."""
    
    user_input = state.get('user_input', '')
    user_memo = state.get('user_memo', {})
    profile = user_memo.get('profile', {})
    
    user_name = profile.get('name', '')
    name_part = f" {user_name}님" if user_name else ""
    
    fallback_responses = [
        f"안녕하세요{name_part}! 요청을 처리하고 있는데, 조금 더 구체적으로 말씀해 주시면 더 정확한 도움을 드릴 수 있을 것 같습니다.",
        f"죄송합니다{name_part}. 현재 요청을 완전히 처리할 수 없었습니다. 다른 방식으로 질문해 주시거나, 구체적인 정보를 더 제공해 주시면 도움이 될 것 같습니다.",
        f"처리 중에 일시적인 문제가 있었습니다{name_part}. 잠시 후 다시 시도해 주시거나, 다른 방식으로 도움을 요청해 주세요."
    ]
    
    # Select response based on input content
    selected_response = fallback_responses[hash(user_input) % len(fallback_responses)]
    
    # Add wedding context if appropriate
    if profile.get('wedding_date'):
        selected_response += f"\n\n결혼 준비와 관련해서 도움이 필요하시면 언제든 말씀해 주세요!"
    
    return selected_response


def _generate_minimal_response(state: dict) -> str:
    """Generate minimal response for edge cases."""
    
    return "요청을 확인했습니다. 추가로 도움이 필요하시면 언제든 말씀해 주세요."


def _apply_content_filtering(content: str, strategy: dict, state: dict) -> str:
    """Apply intelligent content filtering and quality improvements."""
    
    if not content or not content.strip():
        return content
    
    # Remove excessive whitespace and normalize formatting
    filtered_content = re.sub(r'\n\s*\n\s*\n+', '\n\n', content.strip())
    
    # Apply strategy-specific filtering
    response_type = strategy.get('response_type', 'hybrid')
    
    if response_type == 'data_driven':
        # Keep technical details but make them readable
        filtered_content = _enhance_technical_content(filtered_content)
        
    elif response_type == 'conversational':
        # Ensure natural flow and readability
        filtered_content = _enhance_conversational_content(filtered_content)
    
    # Apply general quality improvements
    filtered_content = _apply_general_quality_improvements(filtered_content)
    
    return filtered_content


def _enhance_technical_content(content: str) -> str:
    """Enhance technical content for better user comprehension."""
    
    # Add structure to technical information
    if '결과' in content and '예상' in content:
        content = re.sub(r'(\d+(?:,\d+)*(?:\.\d+)?)', r'**\1**', content)  # Bold numbers
    
    return content


def _enhance_conversational_content(content: str) -> str:
    """Enhance conversational content for natural flow."""
    
    # Ensure proper punctuation
    content = re.sub(r'([.!?])\s*([가-힣])', r'\1 \2', content)
    
    return content


def _apply_general_quality_improvements(content: str) -> str:
    """Apply general quality improvements to any content."""
    
    # Fix common formatting issues
    content = re.sub(r'\s+([.!?])', r'\1', content)  # Remove spaces before punctuation
    content = re.sub(r'([.!?])([가-힣])', r'\1 \2', content)  # Add spaces after punctuation
    
    return content.strip()


def _apply_personalization_and_context(content: str, user_memo: dict, state: dict) -> str:
    """Apply personalization and contextual enhancements to the response."""
    
    if not content:
        return content
    
    profile = user_memo.get('profile', {})
    user_name = profile.get('name')
    wedding_date = profile.get('wedding_date')
    
    # Apply personalization without being intrusive
    personalized_content = content
    
    # Add contextual wedding information if relevant and not already present
    if wedding_date and '결혼' not in content and len(content) > 50:
        if not content.endswith(('!', '?', '.')):
            personalized_content += "."
        personalized_content += f"\n\n참고로 {wedding_date} 결혼식을 준비하고 계시니, 관련해서 추가로 필요한 것이 있으시면 언제든 말씀해 주세요!"
    
    return personalized_content


def _integrate_tool_results_and_updates(response: str, tool_results: dict, memo_updates: list, 
                                      update_summary: dict, strategy: dict) -> str:
    """Integrate relevant tool results and memory updates into the response."""
    
    if not response:
        return response
    
    integration_parts = []
    
    # Add memory update acknowledgment if significant updates were made
    if memo_updates and update_summary.get('total_updates', 0) > 0:
        profile_updates = update_summary.get('profile_updates', 0)
        if profile_updates > 0:
            integration_parts.append(f"입력해 주신 정보 {profile_updates}개를 저장했습니다.")
    
    # Add tool execution acknowledgment for data-driven responses
    if strategy.get('include_tool_details') and tool_results:
        successful_tools = [name for name, result in tool_results.items() 
                          if result.get('success', False)]
        if successful_tools and 'db_query' in successful_tools:
            integration_parts.append("데이터베이스에서 최신 정보를 가져왔습니다.")
    
    # Integrate additional parts
    if integration_parts:
        # Add to beginning or end based on response type
        integrated_response = response
        if strategy.get('response_type') == 'data_driven':
            integrated_response = '\n\n'.join(integration_parts) + '\n\n' + response
        else:
            integrated_response = response + '\n\n' + ' '.join(integration_parts)
        return integrated_response
    
    return response


def _apply_final_formatting(response: str, strategy: dict, user_memo: dict) -> str:
    """Apply final formatting and style adjustments to the response."""
    
    if not response:
        return "죄송합니다. 응답을 생성하는 중에 문제가 발생했습니다. 다시 시도해 주세요."
    
    formatting_style = strategy.get('formatting_style', 'conversational')
    
    # Apply style-specific formatting
    if formatting_style == 'natural':
        # Keep natural conversational style
        formatted_response = response
    elif formatting_style == 'structured':
        # Add more structure with bullet points or numbering
        formatted_response = _add_structure_formatting(response)
    else:
        # Default conversational formatting
        formatted_response = response
    
    # Ensure proper ending
    if not formatted_response.rstrip().endswith(('!', '?', '.', '요', '요.', '니다', '니다.')):
        formatted_response = formatted_response.rstrip() + '.'
    
    # Final cleanup
    formatted_response = re.sub(r'\n{3,}', '\n\n', formatted_response)
    
    return formatted_response.strip()


def _add_structure_formatting(response: str) -> str:
    """Add structured formatting to responses."""
    
    # This is a simple implementation - can be enhanced based on needs
    return response


def _generate_response_metadata(final_response: str, strategy: dict, state: dict) -> dict:
    """Generate comprehensive metadata about the response generation process."""
    
    metadata = {
        'response_length': len(final_response),
        'word_count': len(final_response.split()),
        'sentence_count': len([s for s in final_response.split('.') if s.strip()]),
        'personalization_applied': bool(state.get('user_memo', {}).get('profile', {}).get('name')),
        'quality_score': _calculate_quality_score(final_response, strategy, state),
        'response_type': strategy.get('response_type', 'unknown'),
        'formatting_style': strategy.get('formatting_style', 'conversational'),
        'sources_integrated': len(strategy.get('sources_used', [])),
        'processing_notes': [
            f"Primary source: {strategy.get('primary_source')}",
            f"Response type: {strategy.get('response_type')}",
            f"Personalization level: {strategy.get('personalization_level')}"
        ]
    }
    
    return metadata


def _calculate_quality_score(response: str, strategy: dict, state: dict) -> float:
    """Calculate a quality score for the generated response."""
    
    if not response or len(response.strip()) < 10:
        return 0.0
    
    score = 2.5  # Base score
    
    # Length appropriateness (not too short, not too long)
    length = len(response)
    if 50 <= length <= 500:
        score += 0.5
    elif 500 < length <= 1000:
        score += 0.3
    
    # Content relevance indicators
    user_input = state.get('user_input', '').lower()
    response_lower = response.lower()
    
    # Check for input relevance
    if any(word in response_lower for word in user_input.split() if len(word) > 2):
        score += 0.5
    
    # Check for helpful content indicators
    helpful_indicators = ['추천', '도움', '정보', '결과', '확인', '완료']
    if any(indicator in response for indicator in helpful_indicators):
        score += 0.3
    
    # Check for engagement elements
    engagement_indicators = ['궁금', '필요', '말씀', '연락', '추가']
    if any(indicator in response for indicator in engagement_indicators):
        score += 0.2
    
    # Personalization bonus
    if strategy.get('personalization_level') == 'high':
        score += 0.3
    
    # Cap at 5.0
    return min(score, 5.0)


def _handle_critical_error(state: dict, error_message: str, metadata: dict) -> dict:
    """Handle critical errors that prevent response generation."""
    
    metadata['error_type'] = 'critical_error'
    metadata['error_message'] = error_message
    
    state['final_response'] = "시스템 처리 중 오류가 발생했습니다. 잠시 후 다시 시도해 주세요."
    state['response_metadata'] = metadata
    state['status'] = "error"
    state['reason'] = error_message
    
    print(f"❌ 응답 생성 중 치명적 오류: {error_message}")
    
    return state


def _handle_generation_error(state: dict, error_message: str, metadata: dict) -> dict:
    """Handle general response generation errors."""
    
    metadata['error_type'] = 'generation_error'
    metadata['error_message'] = error_message
    
    # Try to provide a meaningful fallback response
    user_memo = state.get('user_memo', {})
    profile = user_memo.get('profile', {})
    user_name = profile.get('name', '')
    
    fallback_response = f"안녕하세요{f' {user_name}님' if user_name else ''}! " \
                       f"요청을 처리하는 중에 일시적인 문제가 발생했습니다. " \
                       f"다시 시도해 주시거나 다른 방식으로 질문해 주세요."
    
    state['final_response'] = fallback_response
    state['response_metadata'] = metadata
    state['status'] = "error"
    state['reason'] = f"Response generation error: {error_message}"
    
    print(f"⚠️ 응답 생성 오류 (폴백 사용): {error_message}")
    
    return state

In [50]:
# Cell 14: response_generation_node 테스트

def test_response_generation_node():
    """Comprehensive test suite for response_generation_node functionality."""
    
    print("🧪 response_generation_node 테스트 시작\n")
    
    test_cases = [
        {
            "name": "도구 실행 결과 기반 응답 생성",
            "scenario": "tool_results_primary",
            "state": _create_test_state_tool_results()
        },
        {
            "name": "일반 응답 기반 최종 생성",
            "scenario": "general_response_primary",
            "state": _create_test_state_general_response()
        },
        {
            "name": "복합 소스 통합 응답 생성",
            "scenario": "multi_source_integration",
            "state": _create_test_state_multi_source()
        },
        {
            "name": "개인화 적용 응답 생성",
            "scenario": "personalized_response",
            "state": _create_test_state_personalized()
        },
        {
            "name": "메모리 업데이트 통합 응답",
            "scenario": "memo_update_integration",
            "state": _create_test_state_memo_updates()
        },
        {
            "name": "에러 상황 처리 및 폴백",
            "scenario": "error_handling",
            "state": _create_test_state_error_cases()
        },
        {
            "name": "빈 콘텐츠 처리",
            "scenario": "empty_content_handling",
            "state": _create_test_state_empty_content()
        },
        {
            "name": "품질 점수 평가",
            "scenario": "quality_assessment",
            "state": _create_test_state_quality_check()
        }
    ]
    
    for i, test_case in enumerate(test_cases, 1):
        print(f"{'='*60}")
        print(f"테스트 {i}: {test_case['name']}")
        print(f"시나리오: {test_case['scenario']}")
        print(f"라우팅 결정: {test_case['state'].get('routing_decision', 'N/A')}")
        print("-" * 40)
        
        # Execute response_generation_node
        try:
            initial_state = test_case['state'].copy()
            result_state = response_generation_node(test_case['state'])
            
            # Analyze results
            _analyze_response_generation_results(test_case, initial_state, result_state)
            
        except Exception as e:
            print(f"❌ 테스트 실행 중 오류: {str(e)}")
            import traceback
            traceback.print_exc()
            
        print("-" * 60)
        print()
    
    print("🧪 response_generation_node 테스트 완료")


def _create_test_state_tool_results() -> dict:
    """Create test state for tool results primary scenario."""
    
    return {
        "user_id": "test_user_001",
        "user_input": "강남구 웨딩홀 추천해주세요",
        "routing_decision": "tool_execution",
        "response_content": "",
        "tool_results": {
            "db_query_tool": {
                "success": True,
                "data": {
                    "query_type": "venue_search",
                    "vendor_type": "venue",
                    "region": "강남구",
                    "results_count": 12,
                    "results": [
                        {"name": "그랜드 웨딩홀", "rating": 4.5, "price_range": "300-500만원"},
                        {"name": "로얄 컨벤션", "rating": 4.3, "price_range": "250-400만원"},
                        {"name": "엘리시안 웨딩홀", "rating": 4.7, "price_range": "400-600만원"}
                    ]
                }
            },
            "web_search_tool": {
                "success": True,
                "data": {
                    "search_query": "강남구 웨딩홀 2025",
                    "summary": "강남 지역 프리미엄 웨딩홀들이 인기. 예약은 3-6개월 전 추천",
                    "results": [
                        {"title": "2025 강남 웨딩홀 TOP 10", "url": "example.com"},
                        {"title": "웨딩홀 선택 가이드", "url": "example2.com"}
                    ]
                }
            }
        },
        "user_memo": {
            "profile": {
                "user_id": "test_user_001",
                "name": "김철수",
                "wedding_date": "2025-10-15",
                "total_budget_manwon": 5000
            }
        },
        "memo_updates_made": [],
        "update_summary": {}
    }


def _create_test_state_general_response() -> dict:
    """Create test state for general response primary scenario."""
    
    return {
        "user_id": "test_user_002",
        "user_input": "결혼 준비가 너무 힘들어요",
        "routing_decision": "general_response",
        "response_content": "결혼 준비가 힘드시죠? 정말 많은 분들이 같은 고민을 하세요. 결혼식 준비는 단계적으로 접근하는 것이 중요해요. 먼저 예산과 날짜, 하객 규모를 정하고, 그 다음에 예식장과 스튜디오를 선택하시는 것을 추천드려요.",
        "tool_results": {},
        "user_memo": {
            "profile": {
                "user_id": "test_user_002",
                "name": "이영희",
                "wedding_date": "2025-12-01"
            }
        },
        "memo_updates_made": [],
        "update_summary": {}
    }


def _create_test_state_multi_source() -> dict:
    """Create test state for multi-source integration scenario."""
    
    return {
        "user_id": "test_user_003",
        "user_input": "예산 3000만원으로 설정하고 강남 스튜디오도 찾아주세요",
        "routing_decision": "tool_execution",
        "response_content": "예산이 설정되었고 스튜디오 검색을 진행하겠습니다.",
        "tool_results": {
            "user_db_update_tool": {
                "success": True,
                "data": {
                    "updated_fields": {
                        "total_budget_manwon": 3000
                    },
                    "operation": "profile_update"
                }
            },
            "db_query_tool": {
                "success": True,
                "data": {
                    "query_type": "vendor_search",
                    "vendor_type": "studio",
                    "region": "강남구",
                    "results_count": 8,
                    "results": [
                        {"name": "아트 스튜디오", "rating": 4.4},
                        {"name": "모던 포토", "rating": 4.2}
                    ]
                }
            }
        },
        "user_memo": {
            "profile": {
                "user_id": "test_user_003",
                "name": "박민수"
            }
        },
        "memo_updates_made": [
            {
                "category": "profile",
                "field": "total_budget_manwon",
                "old_value": None,
                "new_value": 3000
            }
        ],
        "update_summary": {
            "total_updates": 1,
            "profile_updates": 1
        }
    }


def _create_test_state_personalized() -> dict:
    """Create test state for personalized response scenario."""
    
    return {
        "user_id": "test_user_004",
        "user_input": "안녕하세요",
        "routing_decision": "general_response",
        "response_content": "안녕하세요! 오늘 하루는 어떻게 보내고 계신가요?",
        "tool_results": {},
        "user_memo": {
            "profile": {
                "user_id": "test_user_004",
                "name": "최지영",
                "wedding_date": "2025-11-20",
                "total_budget_manwon": 4500,
                "guest_count": 120
            },
            "preferences": {
                "style_preferences": ["모던한 스타일"],
                "location_preferences": ["홍대", "강남"]
            }
        },
        "memo_updates_made": [],
        "update_summary": {}
    }


def _create_test_state_memo_updates() -> dict:
    """Create test state for memo update integration scenario."""
    
    return {
        "user_id": "test_user_005",
        "user_input": "결혼일을 크리스마스로 변경하고 예산도 늘려주세요",
        "routing_decision": "tool_execution",
        "response_content": "",
        "tool_results": {
            "user_db_update_tool": {
                "success": True,
                "data": {
                    "updated_fields": {
                        "wedding_date": "2025-12-25",
                        "total_budget_manwon": 6000
                    }
                }
            }
        },
        "user_memo": {
            "profile": {
                "user_id": "test_user_005",
                "name": "정현우"
            }
        },
        "memo_updates_made": [
            {
                "category": "profile",
                "field": "wedding_date",
                "old_value": "2025-10-15",
                "new_value": "2025-12-25"
            },
            {
                "category": "profile", 
                "field": "total_budget_manwon",
                "old_value": 4000,
                "new_value": 6000
            }
        ],
        "update_summary": {
            "total_updates": 2,
            "profile_updates": 2,
            "preference_updates": 0
        }
    }


def _create_test_state_error_cases() -> dict:
    """Create test state for error handling scenarios."""
    
    return {
        "user_id": "test_user_006",
        "user_input": "스튜디오 찾아주세요",
        "routing_decision": "tool_execution",
        "response_content": "",
        "tool_results": {
            "db_query_tool": {
                "success": False,
                "error_message": "Database connection failed"
            },
            "web_search_tool": {
                "success": False,
                "error_message": "Network timeout"
            }
        },
        "user_memo": {
            "profile": {
                "user_id": "test_user_006",
                "name": "홍길동"
            }
        },
        "memo_updates_made": [],
        "update_summary": {}
    }


def _create_test_state_empty_content() -> dict:
    """Create test state for empty content handling."""
    
    return {
        "user_id": "test_user_007",
        "user_input": "도움이 필요해요",
        "routing_decision": "general_response",
        "response_content": "",  # Empty content
        "tool_results": {},
        "user_memo": {
            "profile": {
                "user_id": "test_user_007"
            }
        },
        "memo_updates_made": [],
        "update_summary": {}
    }


def _create_test_state_quality_check() -> dict:
    """Create test state for quality assessment scenario."""
    
    return {
        "user_id": "test_user_008",
        "user_input": "예산 계산해주세요",
        "routing_decision": "tool_execution",
        "response_content": "",
        "tool_results": {
            "calculator_tool": {
                "success": True,
                "data": {
                    "calculation_type": "budget_calculation",
                    "result": 4200,
                    "inputs": {
                        "guest_count": 100,
                        "cost_per_guest": 42
                    },
                    "breakdown": {
                        "venue": 2000,
                        "catering": 1800,
                        "photography": 300,
                        "misc": 100
                    }
                }
            }
        },
        "user_memo": {
            "profile": {
                "user_id": "test_user_008",
                "name": "김하나",
                "guest_count": 100
            }
        },
        "memo_updates_made": [],
        "update_summary": {}
    }


def _analyze_response_generation_results(test_case: dict, initial_state: dict, result_state: dict):
    """Analyze and display response generation test results."""
    
    status = result_state.get('status', 'unknown')
    scenario = test_case['scenario']
    
    print(f"📊 결과 분석:")
    print(f"   상태: {status}")
    
    if status in ["completed", "ok"]:
        print("   ✅ 성공적으로 완료됨")
        
        final_response = result_state.get('final_response', '')
        response_metadata = result_state.get('response_metadata', {})
        
        print(f"   📏 최종 응답 길이: {len(final_response)}자")
        print(f"   📝 단어 수: {response_metadata.get('word_count', 0)}개")
        print(f"   ⭐ 품질 점수: {response_metadata.get('quality_score', 0):.2f}/5.0")
        print(f"   🔧 주요 소스: {response_metadata.get('primary_content_source', 'unknown')}")
        
        # Display response preview
        preview = final_response[:150] + "..." if len(final_response) > 150 else final_response
        print(f"   💬 응답 미리보기:")
        print(f"      {preview}")
        
        # Scenario-specific validation
        _validate_scenario_specific_response(scenario, initial_state, result_state)
        
        # Quality assessment
        _assess_response_quality(scenario, final_response, response_metadata)
        
    elif status == "error":
        print("   ❌ 실행 실패")
        reason = result_state.get('reason', 'Unknown error')
        print(f"   🚨 실패 이유: {reason}")
        
        # Check if fallback response was provided
        final_response = result_state.get('final_response', '')
        if final_response:
            print(f"   🔄 폴백 응답 제공됨 (길이: {len(final_response)}자)")
    
    else:
        print(f"   ⚠️ 예상치 못한 상태: {status}")


def _validate_scenario_specific_response(scenario: str, initial_state: dict, result_state: dict):
    """Validate response based on specific scenario requirements."""
    
    final_response = result_state.get('final_response', '')
    response_metadata = result_state.get('response_metadata', {})
    
    if scenario == "tool_results_primary":
        _validate_tool_results_response(initial_state, result_state)
        
    elif scenario == "general_response_primary":
        _validate_general_response_processing(initial_state, result_state)
        
    elif scenario == "multi_source_integration":
        _validate_multi_source_integration(initial_state, result_state)
        
    elif scenario == "personalized_response":
        _validate_personalization_application(initial_state, result_state)
        
    elif scenario == "memo_update_integration":
        _validate_memo_update_integration(initial_state, result_state)
        
    elif scenario == "error_handling":
        _validate_error_handling(initial_state, result_state)
        
    elif scenario == "empty_content_handling":
        _validate_empty_content_handling(initial_state, result_state)


def _validate_tool_results_response(initial_state: dict, result_state: dict):
    """Validate tool results-based response generation."""
    
    final_response = result_state.get('final_response', '')
    tool_results = initial_state.get('tool_results', {})
    
    # Check if venue search results are reflected
    if 'db_query_tool' in tool_results:
        venue_data = tool_results['db_query_tool'].get('data', {})
        results_count = venue_data.get('results_count', 0)
        region = venue_data.get('region', '')
        
        if f"{results_count}" in final_response:
            print("     ✅ 검색 결과 수 포함됨")
        else:
            print("     ❌ 검색 결과 수 누락")
            
        if region in final_response:
            print("     ✅ 지역 정보 포함됨")
        else:
            print("     ❌ 지역 정보 누락")
    
    # Check for web search integration
    if 'web_search_tool' in tool_results:
        web_data = tool_results['web_search_tool'].get('data', {})
        if web_data.get('summary', '') and '최신' in final_response:
            print("     ✅ 웹 검색 정보 통합됨")
        else:
            print("     ⚠️ 웹 검색 정보 통합 미확인")


def _validate_general_response_processing(initial_state: dict, result_state: dict):
    """Validate general response processing."""
    
    initial_content = initial_state.get('response_content', '')
    final_response = result_state.get('final_response', '')
    
    # Check if original content is preserved and enhanced
    if initial_content and initial_content in final_response:
        print("     ✅ 원본 응답 내용 보존됨")
    else:
        print("     ❌ 원본 응답 내용 변경/손실")
    
    # Check for personalization enhancement
    user_memo = initial_state.get('user_memo', {})
    user_name = user_memo.get('profile', {}).get('name', '')
    wedding_date = user_memo.get('profile', {}).get('wedding_date', '')
    
    if wedding_date and wedding_date in final_response:
        print("     ✅ 개인화 정보(결혼일) 추가됨")
    else:
        print("     ⚠️ 개인화 정보 추가 미확인")


def _validate_multi_source_integration(initial_state: dict, result_state: dict):
    """Validate multi-source content integration."""
    
    final_response = result_state.get('final_response', '')
    tool_results = initial_state.get('tool_results', {})
    memo_updates = initial_state.get('memo_updates_made', [])
    
    sources_found = 0
    
    # Check for update confirmation
    if 'user_db_update_tool' in tool_results and any(word in final_response for word in ['업데이트', '설정', '저장']):
        sources_found += 1
        print("     ✅ 사용자 정보 업데이트 반영됨")
    
    # Check for search results
    if 'db_query_tool' in tool_results and any(word in final_response for word in ['스튜디오', '찾았습니다']):
        sources_found += 1
        print("     ✅ 검색 결과 반영됨")
    
    # Check for memo updates acknowledgment  
    if memo_updates and any(word in final_response for word in ['정보', '저장']):
        print("     ✅ 메모리 업데이트 반영됨")
    
    print(f"     📊 통합된 소스 수: {sources_found}")


def _validate_personalization_application(initial_state: dict, result_state: dict):
    """Validate personalization application."""
    
    final_response = result_state.get('final_response', '')
    user_memo = initial_state.get('user_memo', {})
    profile = user_memo.get('profile', {})
    
    personalization_score = 0
    
    # Check for name usage
    user_name = profile.get('name', '')
    if user_name and user_name in final_response:
        personalization_score += 1
        print("     ✅ 사용자 이름 활용됨")
    
    # Check for wedding date reference
    wedding_date = profile.get('wedding_date', '')
    if wedding_date and wedding_date in final_response:
        personalization_score += 1
        print("     ✅ 결혼일 정보 활용됨")
    
    # Check for budget reference
    budget = profile.get('total_budget_manwon')
    if budget and str(budget) in final_response:
        personalization_score += 1
        print("     ✅ 예산 정보 활용됨")
    
    print(f"     👤 개인화 점수: {personalization_score}/3")


def _validate_memo_update_integration(initial_state: dict, result_state: dict):
    """Validate memory update integration."""
    
    final_response = result_state.get('final_response', '')
    memo_updates = initial_state.get('memo_updates_made', [])
    update_summary = initial_state.get('update_summary', {})
    
    if not memo_updates:
        print("     ⚠️ 업데이트할 메모리 정보 없음")
        return
    
    # Check for update acknowledgment
    total_updates = update_summary.get('total_updates', 0)
    if any(word in final_response for word in ['업데이트', '변경', '저장']):
        print("     ✅ 업데이트 확인 메시지 포함")
    else:
        print("     ❌ 업데이트 확인 메시지 누락")
    
    # Check for specific field mentions
    updated_fields = []
    for update in memo_updates:
        field = update.get('field', '')
        new_value = update.get('new_value', '')
        
        if field == 'wedding_date' and str(new_value) in final_response:
            updated_fields.append('결혼일')
        elif field == 'total_budget_manwon' and str(new_value) in final_response:
            updated_fields.append('예산')
    
    print(f"     📝 반영된 업데이트 필드: {', '.join(updated_fields) if updated_fields else '없음'}")


def _validate_error_handling(initial_state: dict, result_state: dict):
    """Validate error handling and fallback responses."""
    
    status = result_state.get('status', '')
    final_response = result_state.get('final_response', '')
    tool_results = initial_state.get('tool_results', {})
    
    # All tools failed in this test case
    all_failed = all(not result.get('success', False) for result in tool_results.values())
    
    if all_failed:
        if final_response and len(final_response) > 10:
            print("     ✅ 실패 시 폴백 응답 제공됨")
        else:
            print("     ❌ 폴백 응답 부족")
        
        if any(word in final_response for word in ['죄송', '문제', '다시']):
            print("     ✅ 적절한 사과/안내 메시지")
        else:
            print("     ❌ 사과/안내 메시지 부족")


def _validate_empty_content_handling(initial_state: dict, result_state: dict):
    """Validate handling of empty content scenarios."""
    
    final_response = result_state.get('final_response', '')
    initial_content = initial_state.get('response_content', '')
    
    if not initial_content.strip():
        if final_response and len(final_response) > 20:
            print("     ✅ 빈 콘텐츠 시 적절한 폴백 제공")
        else:
            print("     ❌ 빈 콘텐츠 처리 부족")


def _assess_response_quality(scenario: str, final_response: str, metadata: dict):
    """Assess overall response quality."""
    
    quality_score = metadata.get('quality_score', 0)
    response_length = len(final_response)
    word_count = metadata.get('word_count', 0)
    
    print(f"   📏 품질 평가:")
    
    # Length assessment
    if 50 <= response_length <= 800:
        print("     ✅ 적절한 응답 길이")
    elif response_length < 50:
        print("     ⚠️ 응답이 너무 짧음")
    else:
        print("     ⚠️ 응답이 너무 김")
    
    # Quality score assessment
    if quality_score >= 4.0:
        print("     ✅ 높은 품질 점수")
    elif quality_score >= 3.0:
        print("     ✅ 양호한 품질 점수")
    elif quality_score >= 2.0:
        print("     ⚠️ 보통 품질 점수")
    else:
        print("     ❌ 낮은 품질 점수")
    
    # Content relevance (basic check)
    if any(word in final_response for word in ['결혼', '웨딩', '예식', '추천', '도움']):
        print("     ✅ 관련성 있는 내용")
    else:
        print("     ⚠️ 관련성 확인 필요")


# 전체 응답 샘플 표시 함수
def _display_sample_responses():
    """Display sample responses for manual review."""
    
    print("\n" + "="*60)
    print("📋 전체 응답 샘플 리뷰")
    print("="*60)
    
    sample_case = _create_test_state_tool_results()
    try:
        result = response_generation_node(sample_case)
        final_response = result.get('final_response', '응답 생성 실패')
        metadata = result.get('response_metadata', {})
        
        print(f"\n🎯 샘플 응답:")
        print(f"입력: {sample_case['user_input']}")
        print(f"응답: {final_response}")
        print(f"품질: {metadata.get('quality_score', 0):.2f}/5.0")
        print(f"길이: {len(final_response)}자")
        
    except Exception as e:
        print(f"샘플 생성 오류: {str(e)}")


# 테스트 실행 함수
if __name__ == "__main__":
    test_response_generation_node()
    
    # Uncomment to see sample responses
    _display_sample_responses()

🧪 response_generation_node 테스트 시작

테스트 1: 도구 실행 결과 기반 응답 생성
시나리오: tool_results_primary
라우팅 결정: tool_execution
----------------------------------------
✅ 최종 응답 생성 완료
   📏 응답 길이: 313자
   📍 주요 소스: tool_results
   ⭐ 품질 점수: 4.30/5.0
   🔧 사용된 소스: tool_execution
📊 결과 분석:
   상태: completed
   ✅ 성공적으로 완료됨
   📏 최종 응답 길이: 313자
   📝 단어 수: 71개
   ⭐ 품질 점수: 4.30/5.0
   🔧 주요 소스: tool_results
   💬 응답 미리보기:
      강남구 지역에서 venue 12곳을 찾았습니다.

추천 업체:
1. 그랜드 웨딩홀 (평점: 4.5⭐)
2. 로얄 컨벤션 (평점: 4.3⭐)
3. 엘리시안 웨딩홀 (평점: 4.7⭐)

더 자세한 정보가 필요하시면 말씀해 주세요!

'강남구 웨딩홀 2025'에 대한 최신 ...
     ✅ 검색 결과 수 포함됨
     ✅ 지역 정보 포함됨
     ✅ 웹 검색 정보 통합됨
   📏 품질 평가:
     ✅ 적절한 응답 길이
     ✅ 높은 품질 점수
     ✅ 관련성 있는 내용
------------------------------------------------------------

테스트 2: 일반 응답 기반 최종 생성
시나리오: general_response_primary
라우팅 결정: general_response
----------------------------------------
✅ 최종 응답 생성 완료
   📏 응답 길이: 118자
   📍 주요 소스: response_content
   ⭐ 품질 점수: 4.10/5.0
   🔧 사용된 소스: general_response
📊 결과 분석:
   상태: completed
   ✅ 성공적으로 완료

In [51]:
# Cell 15: recommendation_node 구현 (Pass 버전)

def recommendation_node(state: dict) -> dict:
    """
    Recommendation engine node for personalized wedding vendor suggestions (MVP: Pass-through implementation).
    
    This node is designed to provide intelligent, context-aware recommendations for wedding vendors
    based on user preferences, budget constraints, location requirements, and historical patterns.
    
    Planned Functionality (Future Implementation):
    - Machine Learning-based vendor matching using user profile and preference data
    - Budget-optimized recommendations with cost-benefit analysis
    - Location-based filtering with distance and accessibility considerations  
    - Style compatibility assessment between user preferences and vendor portfolios
    - Historical performance analysis and success rate tracking
    - Real-time availability integration and booking optimization
    - Multi-criteria decision analysis for complex trade-off scenarios
    
    Current MVP Status:
    This node currently implements a pass-through pattern for rapid development iteration.
    The recommendation logic will be developed in subsequent phases after core functionality
    stabilization and comprehensive user feedback collection.
    
    Args:
        state (dict): State containing user preferences, budget, location, and vendor requirements
        
    Returns:
        dict: State with recommendation routing decision (currently passes to tool_execution)
    """
    
    print("📋 recommendation_node: MVP에서는 tool_execution으로 라우팅합니다.")
    
    # MVP: Pass through to tool execution for basic vendor search
    state['routing_decision'] = "tool_execution"
    state['tools_to_execute'] = state.get('tools_to_execute', ['db_query_tool'])
    state['recommendation_status'] = "passed_to_tool_execution"
    state['status'] = "ok"
    
    return state

In [53]:
# Cell 16: error_handler_node 구현

def error_handler_node(state: dict) -> dict:
    """
    Centralized error processing and recovery node that manages all system failures across the LangGraph pipeline.
    
    This node serves as the comprehensive error management system that intercepts, analyzes, and resolves
    failures from any processing node in the wedding planner AI agent workflow. It acts as the critical
    safety net ensuring graceful degradation and user experience preservation even during system failures.
    
    Core Responsibilities:
    - Universal Error Interception: Captures and processes errors from all nodes (parsing, memo_check, 
      conditional_router, tool_execution, general_response, memo_update, response_generation)
    - Intelligent Error Classification: Categorizes errors by type, severity, and recovery potential
    - Context-Aware Error Analysis: Examines error context including user input, current state, and 
      processing history to determine optimal recovery strategies
    - Multi-Level Recovery Orchestration: Implements cascading recovery mechanisms from simple retries 
      to complete workflow rerouting
    - User Experience Protection: Ensures users receive meaningful, helpful responses even during 
      critical system failures
    - Comprehensive Error Logging: Records detailed error information for debugging and system improvement
    
    Error Classification System:
    1. Recoverable Errors:
       - Temporary Network Issues: Web search timeouts, API rate limits
       - Data Inconsistencies: Missing user profile fields, invalid date formats
       - Tool Execution Failures: Database connection issues, calculation errors
       - Memory System Glitches: File access problems, JSON parsing errors
    
    2. Workflow Errors:
       - Routing Failures: Invalid routing decisions, missing required parameters
       - State Corruption: Incomplete state transitions, missing required fields
       - Integration Issues: Tool result processing failures, format mismatches
    
    3. Critical System Errors:
       - Authentication Failures: User identification problems, session management issues
       - Resource Exhaustion: Memory limits, processing timeouts
       - Configuration Problems: Missing environment variables, invalid settings
    
    Recovery Strategy Framework:
    - Level 1 (Automatic Retry): Temporary failures with immediate retry capability
    - Level 2 (Alternative Routing): Workflow rerouting to alternative processing paths
    - Level 3 (Degraded Service): Reduced functionality with core services maintained
    - Level 4 (Graceful Failure): User-friendly error messages with guidance for resolution
    
    Error Context Analysis:
    - Source Node Identification: Determines which processing node triggered the error
    - State Integrity Assessment: Evaluates completeness and validity of current system state
    - User Impact Evaluation: Assesses severity of error impact on user experience
    - Recovery Feasibility Analysis: Determines optimal recovery approach based on error context
    
    User Experience Management:
    - Error Message Personalization: Customizes error responses based on user profile and context
    - Actionable Guidance Provision: Provides specific steps users can take to resolve issues
    - Workflow Continuation Assistance: Guides users through alternative approaches to achieve their goals
    - Professional Tone Maintenance: Ensures error messages maintain helpful, apologetic, and professional tone
    
    System Monitoring and Logging:
    - Comprehensive Error Documentation: Records full error context, stack traces, and system state
    - Performance Impact Tracking: Monitors error frequency and system degradation patterns
    - Recovery Success Metrics: Tracks effectiveness of different recovery strategies
    - User Satisfaction Preservation: Measures user experience quality during error scenarios
    
    Advanced Error Handling Features:
    - Intelligent Fallback Selection: Chooses optimal fallback responses based on user context and error type
    - State Preservation and Rollback: Maintains system integrity during recovery operations
    - Cross-Node Error Pattern Recognition: Identifies systemic issues requiring architectural attention
    - Progressive Error Escalation: Implements graduated response severity based on error persistence
    
    The error handler ensures system reliability and user satisfaction by transforming potentially
    frustrating failure scenarios into helpful, informative interactions that maintain user engagement
    and provide clear paths forward for achieving wedding planning objectives.
    
    Args:
        state (dict): Complete system state including error information, user context, processing history,
                     current node information, and any partial results from failed operations
        
    Returns:
        dict: Enhanced state with error resolution results, recovery actions taken, final user response,
              comprehensive error logging, and termination status for graph execution
    """
    
    # Extract comprehensive error context
    current_node = state.get('current_node', 'unknown')
    error_info = state.get('error_info', {})
    user_input = state.get('user_input', '')
    user_memo = state.get('user_memo', {})
    user_id = state.get('user_id')
    reason = state.get('reason', 'Unknown error occurred')
    status = state.get('status', 'unknown')
    
    # Initialize error processing metadata
    error_processing_metadata = {
        'error_handler_timestamp': datetime.now().isoformat(),
        'source_node': current_node,
        'error_severity': 'unknown',
        'recovery_attempted': False,
        'recovery_successful': False,
        'fallback_strategy': 'none',
        'user_impact_level': 'unknown'
    }
    
    try:
        print(f"🚨 error_handler_node 활성화: {current_node}에서 오류 발생")
        print(f"   오류 상태: {status}")
        print(f"   오류 이유: {reason}")
        
        # Perform comprehensive error analysis
        error_analysis = _analyze_error_context(state, current_node, reason, error_info)
        error_processing_metadata.update(error_analysis['metadata'])
        
        # Determine recovery strategy based on error analysis
        recovery_strategy = _determine_recovery_strategy(error_analysis, state)
        error_processing_metadata['recovery_strategy'] = recovery_strategy['strategy_type']
        
        # Attempt error recovery if feasible
        recovery_result = _attempt_error_recovery(recovery_strategy, state, error_analysis)
        error_processing_metadata.update(recovery_result['metadata'])
        
        # Generate user-friendly error response
        error_response = _generate_error_response(recovery_result, error_analysis, state)
        
        # Log comprehensive error information for debugging
        _log_error_information(error_analysis, recovery_result, state, error_processing_metadata)
        
        # Update state with error handling results
        state.update({
            'final_response': error_response['user_message'],
            'error_processing_metadata': error_processing_metadata,
            'error_analysis_result': error_analysis,
            'recovery_result': recovery_result,
            'status': 'error_handled',
            'termination_reason': 'error_recovery_completed',
            'processing_completed_at': datetime.now().isoformat()
        })
        
        # Log successful error handling
        strategy_name = recovery_strategy.get('strategy_type', 'unknown')
        user_impact = error_processing_metadata.get('user_impact_level', 'unknown')
        
        print(f"✅ 에러 처리 완료")
        print(f"   🔧 복구 전략: {strategy_name}")
        print(f"   👤 사용자 영향도: {user_impact}")
        print(f"   💬 응답 길이: {len(error_response['user_message'])}자")
        
        return state
        
    except Exception as critical_error:
        # Handle catastrophic failures in error handler itself
        return _handle_critical_error_handler_failure(state, critical_error, error_processing_metadata)


def _analyze_error_context(state: dict, source_node: str, reason: str, error_info: dict) -> dict:
    """Perform comprehensive analysis of error context and classification."""
    
    analysis = {
        'error_type': 'unknown',
        'severity_level': 'medium',
        'recovery_feasible': True,
        'user_impact': 'moderate',
        'source_analysis': {},
        'state_integrity': 'unknown',
        'metadata': {}
    }
    
    # Analyze error by source node
    if source_node == 'parsing_node':
        analysis.update(_analyze_parsing_node_error(reason, state))
    elif source_node == 'memo_check_node':
        analysis.update(_analyze_memo_check_error(reason, state))
    elif source_node == 'conditional_router':
        analysis.update(_analyze_router_error(reason, state))
    elif source_node == 'tool_execution_node':
        analysis.update(_analyze_tool_execution_error(reason, state))
    elif source_node == 'general_response_node':
        analysis.update(_analyze_general_response_error(reason, state))
    elif source_node == 'memo_update_node':
        analysis.update(_analyze_memo_update_error(reason, state))
    elif source_node == 'response_generation_node':
        analysis.update(_analyze_response_generation_error(reason, state))
    else:
        analysis.update(_analyze_unknown_node_error(reason, state))
    
    # Assess state integrity
    analysis['state_integrity'] = _assess_state_integrity(state)
    
    # Update metadata
    analysis['metadata'] = {
        'error_classification_timestamp': datetime.now().isoformat(),
        'error_severity': analysis['severity_level'],
        'user_impact_level': analysis['user_impact'],
        'recovery_feasibility': analysis['recovery_feasible']
    }
    
    return analysis


def _analyze_parsing_node_error(reason: str, state: dict) -> dict:
    """Analyze parsing node specific errors."""
    
    if 'No user input' in reason:
        return {
            'error_type': 'input_validation_failure',
            'severity_level': 'low',
            'recovery_feasible': True,
            'user_impact': 'low',
            'source_analysis': {'issue': 'empty_input', 'recovery': 'request_input'}
        }
    elif 'parsing' in reason.lower():
        return {
            'error_type': 'intent_parsing_failure',
            'severity_level': 'medium',
            'recovery_feasible': True,
            'user_impact': 'moderate',
            'source_analysis': {'issue': 'parsing_failed', 'recovery': 'fallback_to_general'}
        }
    else:
        return {
            'error_type': 'parsing_system_error',
            'severity_level': 'high',
            'recovery_feasible': False,
            'user_impact': 'high',
            'source_analysis': {'issue': 'system_failure', 'recovery': 'graceful_degradation'}
        }


def _analyze_memo_check_error(reason: str, state: dict) -> dict:
    """Analyze memory check node specific errors."""
    
    if 'user_id' in reason.lower():
        return {
            'error_type': 'user_identification_failure',
            'severity_level': 'high',
            'recovery_feasible': False,
            'user_impact': 'high',
            'source_analysis': {'issue': 'no_user_id', 'recovery': 'session_restart_required'}
        }
    elif 'file' in reason.lower() or 'memory' in reason.lower():
        return {
            'error_type': 'memory_system_failure',
            'severity_level': 'medium',
            'recovery_feasible': True,
            'user_impact': 'moderate',
            'source_analysis': {'issue': 'memory_access_failed', 'recovery': 'create_new_memory'}
        }
    else:
        return {
            'error_type': 'memo_system_error',
            'severity_level': 'medium',
            'recovery_feasible': True,
            'user_impact': 'moderate',
            'source_analysis': {'issue': 'unknown_memo_error', 'recovery': 'bypass_memory'}
        }


def _analyze_router_error(reason: str, state: dict) -> dict:
    """Analyze routing node specific errors."""
    
    return {
        'error_type': 'routing_decision_failure',
        'severity_level': 'medium',
        'recovery_feasible': True,
        'user_impact': 'moderate',
        'source_analysis': {'issue': 'routing_failed', 'recovery': 'default_to_general_response'}
    }


def _analyze_tool_execution_error(reason: str, state: dict) -> dict:
    """Analyze tool execution node specific errors."""
    
    if 'database' in reason.lower():
        return {
            'error_type': 'database_connection_failure',
            'severity_level': 'high',
            'recovery_feasible': True,
            'user_impact': 'high',
            'source_analysis': {'issue': 'db_connection_failed', 'recovery': 'use_fallback_data'}
        }
    elif 'network' in reason.lower() or 'timeout' in reason.lower():
        return {
            'error_type': 'network_connectivity_issue',
            'severity_level': 'medium',
            'recovery_feasible': True,
            'user_impact': 'moderate',
            'source_analysis': {'issue': 'network_failure', 'recovery': 'retry_or_fallback'}
        }
    else:
        return {
            'error_type': 'tool_execution_failure',
            'severity_level': 'medium',
            'recovery_feasible': True,
            'user_impact': 'moderate',
            'source_analysis': {'issue': 'tool_failed', 'recovery': 'alternative_approach'}
        }


def _analyze_general_response_error(reason: str, state: dict) -> dict:
    """Analyze general response node specific errors."""
    
    return {
        'error_type': 'response_generation_failure',
        'severity_level': 'low',
        'recovery_feasible': True,
        'user_impact': 'low',
        'source_analysis': {'issue': 'response_failed', 'recovery': 'simple_fallback'}
    }


def _analyze_memo_update_error(reason: str, state: dict) -> dict:
    """Analyze memory update node specific errors."""
    
    if 'conflict' in reason.lower():
        return {
            'error_type': 'memory_conflict_resolution_needed',
            'severity_level': 'low',
            'recovery_feasible': True,
            'user_impact': 'low',
            'source_analysis': {'issue': 'user_confirmation_needed', 'recovery': 'request_user_confirmation'}
        }
    else:
        return {
            'error_type': 'memory_update_failure',
            'severity_level': 'medium',
            'recovery_feasible': True,
            'user_impact': 'moderate',
            'source_analysis': {'issue': 'update_failed', 'recovery': 'proceed_without_update'}
        }


def _analyze_response_generation_error(reason: str, state: dict) -> dict:
    """Analyze response generation node specific errors."""
    
    return {
        'error_type': 'final_response_generation_failure',
        'severity_level': 'medium',
        'recovery_feasible': True,
        'user_impact': 'moderate',
        'source_analysis': {'issue': 'response_generation_failed', 'recovery': 'basic_response_fallback'}
    }


def _analyze_unknown_node_error(reason: str, state: dict) -> dict:
    """Analyze errors from unknown or unspecified nodes."""
    
    return {
        'error_type': 'unknown_system_error',
        'severity_level': 'high',
        'recovery_feasible': False,
        'user_impact': 'high',
        'source_analysis': {'issue': 'unknown_failure', 'recovery': 'system_restart_recommended'}
    }


def _assess_state_integrity(state: dict) -> str:
    """Assess the integrity and completeness of the current state."""
    
    critical_fields = ['user_id', 'user_input']
    missing_critical = [field for field in critical_fields if not state.get(field)]
    
    if missing_critical:
        return 'corrupted'
    elif state.get('user_memo') and state.get('processing_timestamp'):
        return 'good'
    else:
        return 'partial'


def _determine_recovery_strategy(error_analysis: dict, state: dict) -> dict:
    """Determine the optimal recovery strategy based on error analysis."""
    
    strategy = {
        'strategy_type': 'graceful_failure',
        'recovery_actions': [],
        'fallback_approach': 'apologetic_response',
        'retry_feasible': False
    }
    
    error_type = error_analysis.get('error_type', 'unknown')
    severity = error_analysis.get('severity_level', 'high')
    recovery_feasible = error_analysis.get('recovery_feasible', False)
    
    if recovery_feasible and severity in ['low', 'medium']:
        if error_type == 'input_validation_failure':
            strategy.update({
                'strategy_type': 'input_correction',
                'fallback_approach': 'request_clarification',
                'recovery_actions': ['request_valid_input']
            })
        elif error_type in ['database_connection_failure', 'network_connectivity_issue']:
            strategy.update({
                'strategy_type': 'alternative_data_source',
                'fallback_approach': 'cached_or_fallback_data',
                'recovery_actions': ['use_fallback_data', 'inform_user_of_limitations']
            })
        elif error_type == 'memory_conflict_resolution_needed':
            strategy.update({
                'strategy_type': 'user_confirmation_request',
                'fallback_approach': 'conflict_resolution_dialog',
                'recovery_actions': ['present_conflict_options']
            })
        else:
            strategy.update({
                'strategy_type': 'workflow_rerouting',
                'fallback_approach': 'alternative_processing_path',
                'recovery_actions': ['reroute_to_general_response']
            })
    
    return strategy


def _attempt_error_recovery(recovery_strategy: dict, state: dict, error_analysis: dict) -> dict:
    """Attempt to recover from the error using the determined strategy."""
    
    recovery_result = {
        'recovery_attempted': True,
        'recovery_successful': False,
        'actions_taken': [],
        'fallback_content': None,
        'metadata': {}
    }
    
    strategy_type = recovery_strategy.get('strategy_type', 'graceful_failure')
    recovery_actions = recovery_strategy.get('recovery_actions', [])
    
    try:
        if strategy_type == 'input_correction':
            recovery_result.update(_attempt_input_correction_recovery(state, recovery_actions))
        elif strategy_type == 'alternative_data_source':
            recovery_result.update(_attempt_alternative_data_recovery(state, recovery_actions))
        elif strategy_type == 'user_confirmation_request':
            recovery_result.update(_attempt_user_confirmation_recovery(state, recovery_actions))
        elif strategy_type == 'workflow_rerouting':
            recovery_result.update(_attempt_workflow_rerouting_recovery(state, recovery_actions))
        else:
            recovery_result.update(_attempt_graceful_failure_recovery(state, error_analysis))
        
        recovery_result['metadata'] = {
            'recovery_strategy_used': strategy_type,
            'recovery_timestamp': datetime.now().isoformat()
        }
        
    except Exception as recovery_error:
        recovery_result.update({
            'recovery_successful': False,
            'recovery_error': str(recovery_error),
            'actions_taken': ['recovery_failed']
        })
    
    return recovery_result


def _attempt_input_correction_recovery(state: dict, actions: list) -> dict:
    """Attempt recovery by requesting input correction."""
    
    return {
        'recovery_successful': True,
        'actions_taken': ['input_clarification_requested'],
        'fallback_content': {
            'message_type': 'clarification_request',
            'user_guidance': 'input_improvement_needed'
        }
    }


def _attempt_alternative_data_recovery(state: dict, actions: list) -> dict:
    """Attempt recovery using alternative data sources."""
    
    return {
        'recovery_successful': True,
        'actions_taken': ['fallback_data_used'],
        'fallback_content': {
            'message_type': 'limited_service_notification',
            'user_guidance': 'reduced_functionality_explained'
        }
    }


def _attempt_user_confirmation_recovery(state: dict, actions: list) -> dict:
    """Attempt recovery by requesting user confirmation."""
    
    return {
        'recovery_successful': True,
        'actions_taken': ['user_confirmation_requested'],
        'fallback_content': {
            'message_type': 'confirmation_request',
            'user_guidance': 'choice_presentation'
        }
    }


def _attempt_workflow_rerouting_recovery(state: dict, actions: list) -> dict:
    """Attempt recovery by rerouting to alternative workflow."""
    
    return {
        'recovery_successful': True,
        'actions_taken': ['workflow_rerouted'],
        'fallback_content': {
            'message_type': 'alternative_approach',
            'user_guidance': 'alternative_solution_offered'
        }
    }


def _attempt_graceful_failure_recovery(state: dict, error_analysis: dict) -> dict:
    """Attempt graceful failure handling with helpful user guidance."""
    
    return {
        'recovery_successful': False,
        'actions_taken': ['graceful_failure_prepared'],
        'fallback_content': {
            'message_type': 'apologetic_failure',
            'user_guidance': 'alternative_suggestions_provided'
        }
    }


def _generate_error_response(recovery_result: dict, error_analysis: dict, state: dict) -> dict:
    """Generate user-friendly error response based on recovery results."""
    
    user_memo = state.get('user_memo', {})
    profile = user_memo.get('profile', {})
    user_name = profile.get('name', '')
    user_input = state.get('user_input', '')
    
    # Personalization elements
    name_part = f" {user_name}님" if user_name else ""
    
    # Determine response based on recovery result
    fallback_content = recovery_result.get('fallback_content', {})
    message_type = fallback_content.get('message_type', 'apologetic_failure')
    
    if message_type == 'clarification_request':
        user_message = _generate_clarification_request_message(user_input, name_part)
    elif message_type == 'limited_service_notification':
        user_message = _generate_limited_service_message(user_input, name_part)
    elif message_type == 'confirmation_request':
        user_message = _generate_confirmation_request_message(state, name_part)
    elif message_type == 'alternative_approach':
        user_message = _generate_alternative_approach_message(user_input, name_part)
    else:
        user_message = _generate_apologetic_failure_message(user_input, name_part, error_analysis)
    
    return {
        'user_message': user_message,
        'message_type': message_type,
        'personalization_applied': bool(user_name)
    }


def _generate_clarification_request_message(user_input: str, name_part: str) -> str:
    """Generate clarification request message."""
    
    return f"""안녕하세요{name_part}! 요청을 처리하는 중에 입력 내용을 정확히 파악하기 어려웠습니다.

조금 더 구체적으로 말씀해 주시면 더 정확한 도움을 드릴 수 있을 것 같아요.

예를 들어:
• 어떤 업체를 찾고 계신가요?
• 언제쯤 필요하신가요?
• 예산이나 지역 등 선호사항이 있으신가요?

결혼 준비와 관련해서 어떤 도움이 필요하신지 다시 한번 말씀해 주세요!"""


def _generate_limited_service_message(user_input: str, name_part: str) -> str:
    """Generate limited service notification message."""
    
    return f"""안녕하세요{name_part}! 요청을 처리하고 있는데, 일시적으로 일부 서비스에 접근하기 어려운 상황입니다.

현재 가능한 범위에서 도움을 드리려고 하는데, 평소보다 제한적인 정보만 제공할 수 있을 것 같아요.

그래도 결혼 준비에 대한 일반적인 조언이나 기본적인 가이드라인은 도움을 드릴 수 있습니다. 어떤 부분이 가장 궁금하신가요?

시스템이 정상화되면 더 상세한 추천과 정보를 제공해 드릴 수 있습니다."""


def _generate_confirmation_request_message(state: dict, name_part: str) -> str:
    """Generate user confirmation request message."""
    
    return f"""안녕하세요{name_part}! 요청을 처리하는 중에 확인이 필요한 부분이 있습니다.

기존 정보와 새로 입력해 주신 정보가 다른 것 같아요. 어떤 정보로 업데이트하시길 원하시는지 확인해 주세요.

정확한 정보로 업데이트해 드린 후 더 나은 추천을 제공해 드리겠습니다!"""


def _generate_alternative_approach_message(user_input: str, name_part: str) -> str:
    """Generate alternative approach message."""
    
    return f"""안녕하세요{name_part}! 요청해 주신 방식으로는 처리가 어려워서, 다른 방법으로 도움을 드리려고 합니다.

결혼 준비와 관련해서 필요한 정보나 조언은 최대한 제공해 드릴 수 있어요.

어떤 부분이 가장 궁금하시거나 도움이 필요하신지 말씀해 주시면, 가능한 방법으로 도움을 드리겠습니다!"""


def _generate_apologetic_failure_message(user_input: str, name_part: str, error_analysis: dict) -> str:
    """Generate apologetic failure message with helpful guidance."""
    
    error_type = error_analysis.get('error_type', 'unknown')
    
    base_message = f"""죄송합니다{name_part}. 요청을 처리하는 중에 기술적인 문제가 발생했습니다."""
    
    if error_type == 'database_connection_failure':
        guidance = """
현재 데이터베이스 연결에 일시적인 문제가 있어 업체 정보를 가져올 수 없습니다.
잠시 후 다시 시도해 주시거나, 일반적인 결혼 준비 조언은 도움을 드릴 수 있습니다."""
        
    elif error_type == 'user_identification_failure':
        guidance = """
사용자 정보 확인에 문제가 발생했습니다.
새로고침 후 다시 시도해 주시거나, 새로운 세션으로 시작해 주세요."""
        
    else:
        guidance = """
잠시 후 다시 시도해 주시거나, 다른 방식으로 질문해 주시면 도움이 될 것 같습니다.
결혼 준비와 관련된 일반적인 조언이나 가이드라인은 언제든 제공해 드릴 수 있습니다."""
    
    return base_message + guidance + "\n\n결혼 준비를 위해 어떤 도움이 필요하신지 말씀해 주세요!"


def _log_error_information(error_analysis: dict, recovery_result: dict, state: dict, metadata: dict):
    """Log comprehensive error information for debugging and monitoring."""
    
    # In production, this would integrate with proper logging systems
    log_entry = {
        'timestamp': datetime.now().isoformat(),
        'error_analysis': error_analysis,
        'recovery_result': recovery_result,
        'state_summary': {
            'user_id': state.get('user_id'),
            'current_node': state.get('current_node'),
            'user_input_length': len(state.get('user_input', '')),
            'has_user_memo': bool(state.get('user_memo')),
            'processing_stage': state.get('routing_decision')
        },
        'processing_metadata': metadata
    }
    
    print(f"📋 에러 로그 생성됨: {log_entry['timestamp']}")


def _handle_critical_error_handler_failure(state: dict, critical_error: Exception, metadata: dict) -> dict:
    """Handle catastrophic failures in the error handler itself."""
    
    # This is the last resort error handling
    emergency_response = """시스템에 예상치 못한 문제가 발생했습니다. 

잠시 후 다시 시도해 주시거나, 브라우저를 새로고침한 후 다시 접속해 주세요.

계속해서 문제가 발생한다면 시스템 관리자에게 문의해 주시기 바랍니다.

결혼 준비를 위한 도움이 필요하시면 언제든 다시 찾아주세요."""
    
    state.update({
        'final_response': emergency_response,
        'status': 'critical_error',
        'termination_reason': 'error_handler_failure',
        'critical_error_info': {
            'error_message': str(critical_error),
            'error_type': type(critical_error).__name__,
            'timestamp': datetime.now().isoformat()
        },
        'processing_completed_at': datetime.now().isoformat()
    })
    
    print(f"💥 치명적 에러 핸들러 실패: {str(critical_error)}")
    
    return state

In [54]:
# Cell 17: error_handler_node 테스트

def test_error_handler_node():
    """Comprehensive test suite for error_handler_node functionality."""
    
    print("🧪 error_handler_node 테스트 시작\n")
    
    test_cases = [
        {
            "name": "parsing_node 입력 검증 실패",
            "scenario": "parsing_input_validation_error",
            "state": _create_parsing_error_state()
        },
        {
            "name": "memo_check_node 사용자 ID 없음",
            "scenario": "memo_check_user_id_error", 
            "state": _create_memo_check_error_state()
        },
        {
            "name": "tool_execution_node DB 연결 실패",
            "scenario": "tool_execution_db_error",
            "state": _create_tool_execution_error_state()
        },
        {
            "name": "general_response_node 응답 생성 실패",
            "scenario": "general_response_error",
            "state": _create_general_response_error_state()
        },
        {
            "name": "memo_update_node 메모리 충돌",
            "scenario": "memo_update_conflict_error",
            "state": _create_memo_update_error_state()
        },
        {
            "name": "response_generation_node 최종 응답 실패",
            "scenario": "response_generation_error",
            "state": _create_response_generation_error_state()
        },
        {
            "name": "네트워크 타임아웃 에러 (복구 가능)",
            "scenario": "recoverable_network_error",
            "state": _create_network_error_state()
        },
        {
            "name": "알 수 없는 시스템 에러 (치명적)",
            "scenario": "critical_unknown_error",
            "state": _create_unknown_error_state()
        },
        {
            "name": "개인화된 에러 응답 테스트",
            "scenario": "personalized_error_response",
            "state": _create_personalized_error_state()
        },
        {
            "name": "상태 무결성 손상 에러",
            "scenario": "state_corruption_error",
            "state": _create_corrupted_state_error()
        }
    ]
    
    for i, test_case in enumerate(test_cases, 1):
        print(f"{'='*60}")
        print(f"테스트 {i}: {test_case['name']}")
        print(f"시나리오: {test_case['scenario']}")
        print(f"소스 노드: {test_case['state'].get('current_node', 'unknown')}")
        print(f"에러 상태: {test_case['state'].get('status', 'unknown')}")
        print("-" * 40)
        
        # Execute error_handler_node
        try:
            initial_state = test_case['state'].copy()
            result_state = error_handler_node(test_case['state'])
            
            # Analyze results
            _analyze_error_handler_results(test_case, initial_state, result_state)
            
        except Exception as e:
            print(f"❌ 테스트 실행 중 오류: {str(e)}")
            import traceback
            traceback.print_exc()
            
        print("-" * 60)
        print()
    
    print("🧪 error_handler_node 테스트 완료")


def _create_parsing_error_state() -> dict:
    """Create test state for parsing node error scenario."""
    
    return {
        "user_id": "test_user_001",
        "user_input": "",  # Empty input causing parsing error
        "current_node": "parsing_node",
        "status": "error",
        "reason": "No user input provided for parsing",
        "user_memo": {
            "profile": {
                "user_id": "test_user_001",
                "name": "김철수"
            }
        },
        "error_info": {
            "error_type": "input_validation_error",
            "timestamp": datetime.now().isoformat()
        }
    }


def _create_memo_check_error_state() -> dict:
    """Create test state for memo check node error scenario."""
    
    return {
        "user_id": None,  # Missing user_id
        "user_input": "결혼 준비 도움 필요해요",
        "current_node": "memo_check_node",
        "status": "error",
        "reason": "No user_id provided for memory check",
        "user_memo": {},
        "error_info": {
            "error_type": "user_identification_error",
            "timestamp": datetime.now().isoformat()
        }
    }


def _create_tool_execution_error_state() -> dict:
    """Create test state for tool execution node error scenario."""
    
    return {
        "user_id": "test_user_002",
        "user_input": "강남 웨딩홀 추천해주세요",
        "current_node": "tool_execution_node",
        "status": "error",
        "reason": "Database connection failed during tool execution",
        "routing_decision": "tool_execution",
        "tools_to_execute": ["db_query_tool"],
        "tool_results": {
            "db_query_tool": {
                "success": False,
                "error_message": "Database connection timeout"
            }
        },
        "user_memo": {
            "profile": {
                "user_id": "test_user_002",
                "name": "이영희",
                "wedding_date": "2025-10-15"
            }
        },
        "error_info": {
            "error_type": "database_connection_error",
            "timestamp": datetime.now().isoformat()
        }
    }


def _create_general_response_error_state() -> dict:
    """Create test state for general response node error scenario."""
    
    return {
        "user_id": "test_user_003",
        "user_input": "결혼 준비 힘들어요",
        "current_node": "general_response_node",
        "status": "error",
        "reason": "General response generation failed: LLM timeout",
        "routing_decision": "general_response",
        "user_memo": {
            "profile": {
                "user_id": "test_user_003"
            }
        },
        "error_info": {
            "error_type": "llm_timeout_error",
            "timestamp": datetime.now().isoformat()
        }
    }


def _create_memo_update_error_state() -> dict:
    """Create test state for memo update node error scenario."""
    
    return {
        "user_id": "test_user_004",
        "user_input": "예산을 5000만원으로 변경해주세요",
        "current_node": "memo_update_node",
        "status": "user_confirmation_required",
        "reason": "User confirmation required for conflicting information",
        "conflicts_detected": [
            {
                "field": "total_budget_manwon",
                "old_value": 3000,
                "new_value": 5000
            }
        ],
        "user_memo": {
            "profile": {
                "user_id": "test_user_004",
                "name": "박민수",
                "total_budget_manwon": 3000
            }
        },
        "error_info": {
            "error_type": "memory_conflict_error",
            "timestamp": datetime.now().isoformat()
        }
    }


def _create_response_generation_error_state() -> dict:
    """Create test state for response generation node error scenario."""
    
    return {
        "user_id": "test_user_005",
        "user_input": "스튜디오 추천해주세요",
        "current_node": "response_generation_node",
        "status": "error",
        "reason": "Response generation failed: Content processing error",
        "routing_decision": "tool_execution",
        "tool_results": {
            "db_query_tool": {
                "success": True,
                "data": {"results_count": 5}
            }
        },
        "response_content": "",
        "user_memo": {
            "profile": {
                "user_id": "test_user_005",
                "name": "최지영"
            }
        },
        "error_info": {
            "error_type": "response_formatting_error",
            "timestamp": datetime.now().isoformat()
        }
    }


def _create_network_error_state() -> dict:
    """Create test state for recoverable network error scenario."""
    
    return {
        "user_id": "test_user_006",
        "user_input": "최신 웨딩 트렌드 알려주세요",
        "current_node": "tool_execution_node",
        "status": "error",
        "reason": "Network timeout during web search execution",
        "routing_decision": "tool_execution",
        "tools_to_execute": ["web_search_tool"],
        "tool_results": {
            "web_search_tool": {
                "success": False,
                "error_message": "Network timeout after 30 seconds"
            }
        },
        "user_memo": {
            "profile": {
                "user_id": "test_user_006"
            }
        },
        "error_info": {
            "error_type": "network_timeout_error",
            "timestamp": datetime.now().isoformat()
        }
    }


def _create_unknown_error_state() -> dict:
    """Create test state for critical unknown error scenario."""
    
    return {
        "user_id": "test_user_007",
        "user_input": "도움이 필요해요",
        "current_node": "unknown_node",
        "status": "error",
        "reason": "Unexpected system error occurred",
        "user_memo": {
            "profile": {
                "user_id": "test_user_007"
            }
        },
        "error_info": {
            "error_type": "unknown_system_error",
            "timestamp": datetime.now().isoformat()
        }
    }


def _create_personalized_error_state() -> dict:
    """Create test state for personalized error response scenario."""
    
    return {
        "user_id": "test_user_008",
        "user_input": "예식장 예약하고 싶어요",
        "current_node": "tool_execution_node",
        "status": "error",
        "reason": "Service temporarily unavailable",
        "user_memo": {
            "profile": {
                "user_id": "test_user_008",
                "name": "정현우",
                "wedding_date": "2025-11-20",
                "total_budget_manwon": 4500
            },
            "preferences": {
                "location_preferences": ["강남", "홍대"]
            }
        },
        "error_info": {
            "error_type": "service_unavailable_error",
            "timestamp": datetime.now().isoformat()
        }
    }


def _create_corrupted_state_error() -> dict:
    """Create test state for state corruption error scenario."""
    
    return {
        "user_id": None,  # Critical field missing
        "user_input": None,  # Critical field missing
        "current_node": "memo_check_node",
        "status": "error",
        "reason": "State corruption detected",
        "user_memo": None,
        "error_info": {
            "error_type": "state_corruption_error",
            "timestamp": datetime.now().isoformat()
        }
    }


def _analyze_error_handler_results(test_case: dict, initial_state: dict, result_state: dict):
    """Analyze and display error handler test results."""
    
    status = result_state.get('status', 'unknown')
    scenario = test_case['scenario']
    source_node = initial_state.get('current_node', 'unknown')
    
    print(f"📊 결과 분석:")
    print(f"   최종 상태: {status}")
    
    if status in ["error_handled", "critical_error"]:
        print("   ✅ 에러 처리 완료됨")
        
        final_response = result_state.get('final_response', '')
        error_processing_metadata = result_state.get('error_processing_metadata', {})
        
        print(f"   📏 에러 응답 길이: {len(final_response)}자")
        print(f"   🔧 복구 전략: {error_processing_metadata.get('recovery_strategy', 'unknown')}")
        print(f"   ⭐ 사용자 영향도: {error_processing_metadata.get('user_impact_level', 'unknown')}")
        print(f"   🏥 복구 시도: {'예' if error_processing_metadata.get('recovery_attempted') else '아니오'}")
        print(f"   ✅ 복구 성공: {'예' if error_processing_metadata.get('recovery_successful') else '아니오'}")
        
        # Display response preview
        preview = final_response[:120] + "..." if len(final_response) > 120 else final_response
        print(f"   💬 응답 미리보기:")
        print(f"      {preview}")
        
        # Scenario-specific validation
        _validate_scenario_specific_error_handling(scenario, source_node, initial_state, result_state)
        
        # Quality assessment
        _assess_error_response_quality(scenario, final_response, error_processing_metadata, initial_state)
        
    else:
        print(f"   ❌ 예상치 못한 상태: {status}")
        reason = result_state.get('reason', 'Unknown reason')
        print(f"   🚨 이유: {reason}")


def _validate_scenario_specific_error_handling(scenario: str, source_node: str, initial_state: dict, result_state: dict):
    """Validate error handling based on specific scenario requirements."""
    
    final_response = result_state.get('final_response', '')
    error_processing_metadata = result_state.get('error_processing_metadata', {})
    
    if scenario == "parsing_input_validation_error":
        _validate_parsing_error_handling(initial_state, result_state)
        
    elif scenario == "memo_check_user_id_error":
        _validate_memo_check_error_handling(initial_state, result_state)
        
    elif scenario == "tool_execution_db_error":
        _validate_tool_execution_error_handling(initial_state, result_state)
        
    elif scenario == "general_response_error":
        _validate_general_response_error_handling(initial_state, result_state)
        
    elif scenario == "memo_update_conflict_error":
        _validate_memo_update_error_handling(initial_state, result_state)
        
    elif scenario == "response_generation_error":
        _validate_response_generation_error_handling(initial_state, result_state)
        
    elif scenario == "recoverable_network_error":
        _validate_recoverable_error_handling(initial_state, result_state)
        
    elif scenario == "critical_unknown_error":
        _validate_critical_error_handling(initial_state, result_state)
        
    elif scenario == "personalized_error_response":
        _validate_personalized_error_handling(initial_state, result_state)
        
    elif scenario == "state_corruption_error":
        _validate_state_corruption_handling(initial_state, result_state)


def _validate_parsing_error_handling(initial_state: dict, result_state: dict):
    """Validate parsing error handling."""
    
    final_response = result_state.get('final_response', '')
    recovery_strategy = result_state.get('error_processing_metadata', {}).get('recovery_strategy', '')
    
    if 'clarification' in recovery_strategy or 'input_correction' in recovery_strategy:
        print("     ✅ 적절한 복구 전략 (입력 개선 요청)")
    else:
        print("     ❌ 부적절한 복구 전략")
    
    if any(word in final_response for word in ['구체적으로', '명확하게', '다시']):
        print("     ✅ 명확한 입력 요청 메시지")
    else:
        print("     ❌ 입력 개선 메시지 부족")


def _validate_memo_check_error_handling(initial_state: dict, result_state: dict):
    """Validate memo check error handling."""
    
    final_response = result_state.get('final_response', '')
    error_severity = result_state.get('error_processing_metadata', {}).get('error_severity', '')
    
    if error_severity in ['high', 'critical']:
        print("     ✅ 높은 심각도로 분류됨")
    else:
        print("     ❌ 심각도 분류 부적절")
    
    if any(word in final_response for word in ['새로고침', '다시 시작', '세션']):
        print("     ✅ 세션 재시작 안내 포함")
    else:
        print("     ❌ 복구 안내 부족")


def _validate_tool_execution_error_handling(initial_state: dict, result_state: dict):
    """Validate tool execution error handling."""
    
    final_response = result_state.get('final_response', '')
    recovery_attempted = result_state.get('error_processing_metadata', {}).get('recovery_attempted', False)
    
    if recovery_attempted:
        print("     ✅ 복구 시도가 이루어짐")
    else:
        print("     ❌ 복구 시도 없음")
    
    if any(word in final_response for word in ['일시적', '데이터베이스', '잠시 후']):
        print("     ✅ 적절한 기술적 설명")
    else:
        print("     ❌ 기술적 설명 부족")


def _validate_general_response_error_handling(initial_state: dict, result_state: dict):
    """Validate general response error handling."""
    
    final_response = result_state.get('final_response', '')
    
    if len(final_response) > 50:
        print("     ✅ 적절한 길이의 폴백 응답")
    else:
        print("     ❌ 폴백 응답 너무 짧음")


def _validate_memo_update_error_handling(initial_state: dict, result_state: dict):
    """Validate memo update error handling."""
    
    final_response = result_state.get('final_response', '')
    
    if any(word in final_response for word in ['확인', '정보', '업데이트']):
        print("     ✅ 확인 요청 메시지 적절")
    else:
        print("     ❌ 확인 요청 메시지 부족")


def _validate_response_generation_error_handling(initial_state: dict, result_state: dict):
    """Validate response generation error handling."""
    
    final_response = result_state.get('final_response', '')
    
    if final_response and len(final_response) > 30:
        print("     ✅ 최종 응답 생성 실패 시 폴백 제공")
    else:
        print("     ❌ 폴백 응답 부족")


def _validate_recoverable_error_handling(initial_state: dict, result_state: dict):
    """Validate recoverable error handling."""
    
    recovery_successful = result_state.get('error_processing_metadata', {}).get('recovery_successful', False)
    
    if recovery_successful:
        print("     ✅ 복구 가능한 에러로 성공적 처리")
    else:
        print("     ⚠️ 복구 시도했으나 부분적 성공")


def _validate_critical_error_handling(initial_state: dict, result_state: dict):
    """Validate critical error handling."""
    
    final_response = result_state.get('final_response', '')
    error_severity = result_state.get('error_processing_metadata', {}).get('error_severity', '')
    
    if error_severity == 'high':
        print("     ✅ 치명적 에러로 적절히 분류됨")
    else:
        print("     ❌ 심각도 분류 부족")
    
    if any(word in final_response for word in ['죄송', '문제', '시스템']):
        print("     ✅ 적절한 사과 및 시스템 문제 설명")
    else:
        print("     ❌ 치명적 에러 대응 부족")


def _validate_personalized_error_handling(initial_state: dict, result_state: dict):
    """Validate personalized error handling."""
    
    final_response = result_state.get('final_response', '')
    user_memo = initial_state.get('user_memo', {})
    user_name = user_memo.get('profile', {}).get('name', '')
    
    if user_name and user_name in final_response:
        print("     ✅ 개인화된 에러 응답 (이름 포함)")
    else:
        print("     ❌ 개인화 부족")
    
    # Check for wedding context preservation
    wedding_date = user_memo.get('profile', {}).get('wedding_date', '')
    if wedding_date and ('결혼' in final_response or '준비' in final_response):
        print("     ✅ 결혼 컨텍스트 유지됨")
    else:
        print("     ⚠️ 결혼 컨텍스트 미약함")


def _validate_state_corruption_handling(initial_state: dict, result_state: dict):
    """Validate state corruption error handling."""
    
    final_response = result_state.get('final_response', '')
    
    if any(word in final_response for word in ['새로고침', '다시 시작', '접속']):
        print("     ✅ 상태 복구를 위한 적절한 안내")
    else:
        print("     ❌ 상태 복구 안내 부족")


def _assess_error_response_quality(scenario: str, final_response: str, metadata: dict, initial_state: dict):
    """Assess overall error response quality."""
    
    response_length = len(final_response)
    recovery_attempted = metadata.get('recovery_attempted', False)
    user_impact_level = metadata.get('user_impact_level', 'unknown')
    
    print(f"   📏 품질 평가:")
    
    # Length assessment
    if 80 <= response_length <= 600:
        print("     ✅ 적절한 응답 길이")
    elif response_length < 80:
        print("     ⚠️ 응답이 너무 짧음")
    else:
        print("     ⚠️ 응답이 너무 김")
    
    # Recovery effort assessment
    if recovery_attempted:
        print("     ✅ 복구 노력이 시도됨")
    else:
        print("     ⚠️ 복구 시도 없음")
    
    # User impact assessment
    if user_impact_level in ['low', 'moderate']:
        print("     ✅ 사용자 영향 최소화됨")
    else:
        print("     ⚠️ 사용자 영향 높음")
    
    # Tone appropriateness
    if any(word in final_response for word in ['죄송', '도움', '다시']):
        print("     ✅ 적절한 사과 및 지원 톤")
    else:
        print("     ❌ 사과/지원 톤 부족")
    
    # Actionable guidance
    if any(word in final_response for word in ['시도', '새로고침', '문의', '다른']):
        print("     ✅ 실행 가능한 가이던스 제공")
    else:
        print("     ❌ 실행 가능한 가이던스 부족")


# 에러 시나리오 시뮬레이션 함수
def _simulate_error_scenarios():
    """Simulate various error scenarios for demonstration."""
    
    print("\n" + "="*60)
    print("🎭 에러 시나리오 시뮬레이션")
    print("="*60)
    
    scenarios = [
        {
            "name": "DB 연결 실패",
            "state": _create_tool_execution_error_state()
        },
        {
            "name": "개인화된 네트워크 에러", 
            "state": _create_personalized_error_state()
        }
    ]
    
    for scenario in scenarios:
        print(f"\n🎯 시나리오: {scenario['name']}")
        try:
            result = error_handler_node(scenario['state'])
            response = result.get('final_response', '응답 생성 실패')
            print(f"에러 응답: {response[:200]}...")
            
        except Exception as e:
            print(f"시뮬레이션 오류: {str(e)}")


# 테스트 실행 함수
if __name__ == "__main__":
    test_error_handler_node()
    
    # Uncomment to see error scenario simulations
    _simulate_error_scenarios()

🧪 error_handler_node 테스트 시작

테스트 1: parsing_node 입력 검증 실패
시나리오: parsing_input_validation_error
소스 노드: parsing_node
에러 상태: error
----------------------------------------
🚨 error_handler_node 활성화: parsing_node에서 오류 발생
   오류 상태: error
   오류 이유: No user input provided for parsing
📋 에러 로그 생성됨: 2025-09-22T16:34:28.153708
✅ 에러 처리 완료
   🔧 복구 전략: input_correction
   👤 사용자 영향도: low
   💬 응답 길이: 198자
📊 결과 분석:
   최종 상태: error_handled
   ✅ 에러 처리 완료됨
   📏 에러 응답 길이: 198자
   🔧 복구 전략: input_correction
   ⭐ 사용자 영향도: low
   🏥 복구 시도: 아니오
   ✅ 복구 성공: 아니오
   💬 응답 미리보기:
      안녕하세요 김철수님! 요청을 처리하는 중에 입력 내용을 정확히 파악하기 어려웠습니다.

조금 더 구체적으로 말씀해 주시면 더 정확한 도움을 드릴 수 있을 것 같아요.

예를 들어:
• 어떤 업체를 찾고 계신가요?
•...
     ✅ 적절한 복구 전략 (입력 개선 요청)
     ✅ 명확한 입력 요청 메시지
   📏 품질 평가:
     ✅ 적절한 응답 길이
     ⚠️ 복구 시도 없음
     ✅ 사용자 영향 최소화됨
     ✅ 적절한 사과 및 지원 톤
     ❌ 실행 가능한 가이던스 부족
------------------------------------------------------------

테스트 2: memo_check_node 사용자 ID 없음
시나리오: memo_check_user_id_error
소스 노드: memo_check_node
에러 상태: erro