In [None]:
# agents/situation_agent.py
from typing import Dict, Any, Optional, List, TypedDict
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing_extensions import Annotated
import operator

# --- LangChain message 타입  ---
try:
    from langchain_core.messages import BaseMessage, HumanMessage
except Exception:  # 런타임 의존성 없을 때도 동작하도록
    class BaseMessage:  # minimal stub
        content: str
        def __init__(self, content: str): self.content = content
    class HumanMessage(BaseMessage): pass

# === Super Agent용 State ===
class OptimizedStrategyState(TypedDict, total=False):
    # 대화 로그
    messages: Annotated[List[BaseMessage], operator.add]

    # Team 1
    analysis_report: str
    summary_data: dict
    strategy_mod: str         
    risk_level: float

    # 가맹점/지역
    target_store_id: str
    target_store_name: str
    target_region: str        
    target_market_id: str     # (선택) market_id

    # Team 2 (분석팀 산출물)
    market_customer_analysis: str   # Agent1+2 통합
    targeting_positioning: str      # Agent3+4 통합

    # 기간
    period_start: str            
    period_end: str              

    # 라우팅 등
    next: str

# ===== 기존 구현(이벤트/날씨 툴) =====
from tools.tavily_events import get_tool as get_events_tool
from tools.weather_signals import detect_weather_signals

def default_market_locator(mid: str):
    return (37.5446, 127.0559, "성수동")

# LangChain StructuredTool 준비
TAVILY_EVENTS_TOOL = get_events_tool(market_locator=default_market_locator)

def _call_events(market_id: str, start: str, end: str, user_query: Optional[str]) -> Dict[str, Any]:
    """Tavily 이벤트 호출 (예외는 상위에서 처리)"""
    return TAVILY_EVENTS_TOOL.invoke({
        "market_id": market_id,
        "start": start,
        "end": end,
        "user_query": user_query,
    })

def _call_weather(user_query: Optional[str], store: Dict[str, Any], period: Dict[str, str]) -> Dict[str, Any]:
    """Open-Meteo 날씨 호출 (예외는 상위에서 처리)"""
    return detect_weather_signals({
        "user_query": user_query,
        "store": store,
        "period": period,
    })

# ===helper - 최신 사용자 메시지 추출 ===
def _latest_user_query(messages: Optional[List[BaseMessage]]) -> Optional[str]:
    if not messages:
        return None
    # 뒤에서부터 사람이 쓴 메시지 우선
    for m in reversed(messages):
        # LangChain HumanMessage 타입이면 그대로, 아니면 content만 사용
        if getattr(m, "type", None) == "human" or isinstance(m, HumanMessage):
            return getattr(m, "content", None)
    # 없으면 마지막 메시지 content라도
    return getattr(messages[-1], "content", None)

# == 새/구 State 모두 처리하는 노드 ===
def situation_agent_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    (하위호환) 기존 state: { user_query, store{market_id}, period{start,end} }
    (신규) OptimizedStrategyState:
        - messages → user_query 추출
        - target_market_id | target_region | target_store_id → market_id로 사용
        - period_start/period_end → period 구성
    """
    # 1) 우선 기존 입력 방식 지원
    if "store" in state and "period" in state:
        user_query = state.get("user_query")
        store = state.get("store") or {}
        period = state.get("period") or {}
        mid, start, end = store.get("market_id"), period.get("start"), period.get("end")
    else:
        # 2) 새 OptimizedStrategyState를 store/period로 어댑트
        user_query = _latest_user_query(state.get("messages"))

        market_id = (
            state.get("target_market_id")
            or state.get("target_region")         # 지역명을 market_id로 재사용(내부 locator가 처리)
            or state.get("target_store_id")       # 최후 fallback
        )
        start = state.get("period_start")
        end = state.get("period_end")

        store = {
            "market_id": market_id,
            "store_id": state.get("target_store_id"),
            "store_name": state.get("target_store_name"),
            "region_name": state.get("target_region"),
        }
        period = {"start": start, "end": end}

    # 입력 검증
    if not (mid := store.get("market_id")) or not start or not end:
        merged = {
            "has_valid_signal": False,
            "summary": "입력 누락: market_id/period_start/period_end 필요",
            "signals": [],
            "citations": [],
            "assumptions": [],
            "contract_version": "situation.v2",
        }
        return {
            "situation": merged,
            "log": (state.get("log") or []) + ["[situation] 입력 누락 (v2 adapter)"]
        }

    # --- 병렬 실행 ---
    events, wx = None, None
    logs = state.get("log") or []

    with ThreadPoolExecutor(max_workers=2) as ex:
        futures = {
            ex.submit(_call_events, mid, start, end, user_query): "events",
            ex.submit(_call_weather, user_query, store, {"start": start, "end": end}): "weather",
        }
        for fut in as_completed(futures):
            tag = futures[fut]
            try:
                res = fut.result()
            except Exception as e:
                res = {
                    "has_valid_signal": False,
                    "summary": f"{tag} 수집 실패({e})",
                    "signals": [],
                    "citations": [],
                    "assumptions": [],
                    "contract_version": "situation.v2",
                }
                logs.append(f"[situation] {tag} error: {e}")

            if tag == "events":
                events = res
            else:
                wx = res

    # 안전 가드
    events = events or {"signals": [], "citations": [], "assumptions": [], "summary": None}
    wx     = wx     or {"signals": [], "citations": [], "assumptions": [], "summary": None}

    # --- 병합 ---
    ev_sig = events.get("signals") or []
    wx_sig = wx.get("signals") or []
    signals = ev_sig + wx_sig
    has_valid = bool(signals)

    parts = []
    if events.get("summary"): parts.append(events["summary"])
    if wx.get("summary"): parts.append(wx["summary"])
    summary = " / ".join(parts) if parts else "신호 없음"

    merged = {
        "has_valid_signal": has_valid,
        "summary": summary,
        "signals": signals,
        "citations": (events.get("citations") or []) + (wx.get("citations") or []),
        "assumptions": (events.get("assumptions") or []) + (wx.get("assumptions") or []),
        "contract_version": "situation.v2",
        # 선택: 분석팀 산출물 스냅샷을 함께 첨부하면 후속 노드에서 편합니다.
        "context": {
            "strategy_mod": state.get("strategy_mod"),
            "risk_level": state.get("risk_level"),
            "market_customer_analysis": state.get("market_customer_analysis"),
            "targeting_positioning": state.get("targeting_positioning"),
        }
    }

    logs.append(f"[situation] events={len(ev_sig)} weather={len(wx_sig)} (parallel, v2)")
    return {"situation": merged, "log": logs}


python-dotenv could not parse statement starting at line 3
  _tavily = TavilySearchResults(max_results=5, include_answer=True, include_raw_content=False)


In [None]:
# agents/content_agent.py

# --- 메시지 타입 스텁 (langchain_core.messages 미사용 환경 대응) ---
try:
    from langchain_core.messages import BaseMessage, AIMessage
except Exception:
    class BaseMessage:
        def __init__(self, content: str): self.content = content
    class AIMessage(BaseMessage): pass

from typing import Dict, Any, List, TypedDict
from typing_extensions import Annotated
import operator
import datetime as _dt

# State 정의 - 분석팀 STATE
class OptimizedStrategyState(TypedDict, total=False):
    # 대화 로그(리듀서)
    messages: Annotated[List[BaseMessage], operator.add]

    # Team 1 (분석팀 산출)
    analysis_report: str
    summary_data: dict
    strategy_mod: str                 
    risk_level: float               

    # 가맹점 관련 정보
    target_store_id: str            
    target_store_name: str        
    target_region: str               

    # Team 2 (추가 분석 통합물)
    market_customer_analysis: str   
    targeting_positioning: str      

    # 라우팅
    next: str
    situation: dict                 # situation_agent_node 출력물
    content_plan: dict              

# 헬퍼 유틸
def _today_weekday_str():
    return ["월","화","수","목","금","토","일"][_dt.date.today().weekday()]

def _mode_tone(strategy_mod: str) -> str:
    if not strategy_mod:
        return "균형 잡힌 톤(정보+참여)"
    sm = strategy_mod.lower()
    if sm.startswith("def"):
        return "안정적·신뢰 중심 톤(정보 비중↑)"
    if sm.startswith("agg"):
        return "공격적·캠페인 드라이브 톤(프로모션 비중↑)"
    return "균형 잡힌 톤(정보+참여)"

def _risk_tip(risk_level: float) -> str:
    try:
        r = float(risk_level)
    except Exception:
        return "표준 리스크 가이드: 과장/허위표현 주의, 쿠폰 남용 방지"
    if r >= 70:
        return "고위험: 과도한 할인·리뷰 인센티브 지양, 사실 검증 강조"
    if r >= 40:
        return "중위험: 명확한 조건 표기·반품/환불 안내 강화"
    return "저위험: 지역성·신뢰 신호(후기/원두정보 등) 강조"


# 콘텐츠 크리에이터 노드
def content_agent_node(state: OptimizedStrategyState) -> Dict[str, Any]:
    """
    입력: OptimizedStrategyState (분석팀 STATE + situation 결과)
    출력: content_plan(dict) + messages(리듀스 append)
    """
    store = state.get("target_store_name") or "매장"
    region = state.get("target_region") or "로컬"
    mod = state.get("strategy_mod") or "balanced"
    risk = state.get("risk_level", 0.0)
    analysis = state.get("analysis_report") or state.get("market_customer_analysis") or ""
    tp = state.get("targeting_positioning") or ""
    sit = state.get("situation", {}) or {}
    sigs = sit.get("signals") or []

    # 신호 요약
    signal_tags = []
    for s in sigs[:5]: 
        st = s.get("type") or "signal"
        sid = s.get("id") or ""
        signal_tags.append(f"{st}#{sid}")
    signal_line = ", ".join(signal_tags) if signal_tags else "신호 없음"

    # 전략 톤·리스크 가이드
    tone = _mode_tone(mod)
    risk_guide = _risk_tip(risk)

    # ====== 산출: 3가지 포맷의 포스트 아이디어 ======
    # 1) 리치형(정보·지역성)
    idea1 = {
        "title": f"[{region}] {store} 오늘의 추천 · 지역 이슈 큐레이션",
        "hook": f"{region} 지금 뜨는 이슈와 함께 즐기는 {store} 시그니처!",
        "copy": [
            f"- {region} 소식(요약) + 매장 추천 메뉴 1-2개를 묶어 정보성 포스트 구성",
            f"- 분석 톤: {tone}",
            f"- 신호 반영: {signal_line}",
        ],
        "media": ["매장 전경 1컷", "추천 메뉴 근접 1컷"],
        "cta": "이슈 확인하고 매장에서 시그니처 한 잔!",
        "hashtags": [f"#{region}", f"#{store}", "#오늘의추천", "#로컬스팟"]
    }

    # 2) 참여유도형(커뮤니티 질문)
    idea2 = {
        "title": f"[{store}] 메뉴 토너먼트 · 투표형 스토리",
        "hook": "당신의 선택은? 시그니처 vs 시즌한정",
        "copy": [
            "스토리 투표 2~3라운드로 선호도 조사",
            f"포지셔닝 힌트: {tp[:80]}..." if tp else "포지셔닝: 부각할 차별 포인트를 짧게",
            f"리스크 가이드: {risk_guide}",
        ],
        "media": ["메뉴 2분할 이미지(정사각)"],
        "cta": "투표 참여하고 결과 공개 때 쿠폰도 받아요!",
        "hashtags": [f"#{store}", "#투표이벤트", "#메뉴토너먼트"]
    }

    # 3) 전환형(오퍼/쿠폰)
    idea3 = {
        "title": f"[{region}] {store} 쿠폰 드롭 · {_today_weekday_str()} 한정",
        "hook": "3시간 한정 쿠폰, 지금 픽업하면 추가 혜택!",
        "copy": [
            "· 조건: 픽업/음료 1잔 이상, 중복할인 불가",
            "· 사실 기반 문구만 사용(원산지·용량·알레르기 표기)",
        ],
        "media": ["쿠폰 그래픽 1컷(세로)"],
        "cta": "지금 픽업 예약하고 혜택 받기",
        "hashtags": [f"#{region}", f"#{store}", "#한정쿠폰", "#픽업전용"]
    }

    # 스케줄 제안
    schedule = [
        {"slot": "오전(07-10)", "goal": "출근 동선 노출", "rec": "리치형(정보·지역) or 투표 1라운드"},
        {"slot": "점심(11-14)", "goal": "근처 체류 전환", "rec": "쿠폰 드롭(픽업 전용)"},
        {"slot": "저녁(17-20)", "goal": "재방문 리마인드", "rec": "투표 결과 공개 + 후기 리그램"},
    ]

    content_plan = {
        "contract_version": "content.v1",
        "store": store,
        "region": region,
        "strategy_mode": mod,
        "risk_level": risk,
        "signal_summary": signal_line,
        "ideas": [idea1, idea2, idea3],
        "schedule": schedule,
        # 추후 레포트/요약 활용 포켓
        "notes": {
            "analysis_hint": (analysis[:220] + "...") if analysis else "",
            "positioning_hint": (tp[:120] + "...") if tp else "",
        }
    }

    # messages 스트림에 AI 응답 push(옵션)
    msgs = list(state.get("messages") or [])
    preview = (
        f"[콘텐츠 플랜]\n"
        f"- 스토어: {store} / 지역: {region}\n"
        f"- 톤: {tone} / 리스크: {risk_guide}\n"
        f"- 신호: {signal_line}\n"
        f"- 아이디어: 3건 구성(리치·참여·전환)"
    )
    msgs.append(AIMessage(content=preview))

    return {"content_plan": content_plan, "messages": msgs}




In [None]:
# agents/graph.py
from langgraph.graph import StateGraph, END

# 공용 State & 두 에이전트 노드
from agents.situation_agent import situation_agent_node, OptimizedStrategyState
from agents.content_agent import content_agent_node

# (옵션) Strategy 노드가 아직 없다면 스텁으로 대체
try:
    from agents.strategy_agent import strategy_agent_node  # 실제 구현이 있으면 자동 사용
except Exception:
    def strategy_agent_node(state: OptimizedStrategyState):
        # 아주 가벼운 플레이스홀더: content_plan/situation을 요약해서 final_strategy에 담아 종료
        store = state.get("target_store_name") or "매장"
        region = state.get("target_region") or "로컬"
        mod = state.get("strategy_mod") or "balanced"
        sit = state.get("situation", {}) or {}
        sig = sit.get("signals") or []
        summary = (
            f"[전략 스텁] {region}/{store} · mode={mod} · signals={len(sig)}개"
        )
        msgs = list(state.get("messages") or [])
        try:
            from langchain_core.messages import AIMessage
            msgs.append(AIMessage(content=summary))
        except Exception:
            pass
        return {"final_strategy": {"summary": summary, "contract_version": "strategy.stub.v1"},
                "messages": msgs}

# HumanMessage 폴백
try:
    from langchain_core.messages import HumanMessage
except Exception:
    class HumanMessage:
        def __init__(self, content: str): self.content = content


# Supervisor 라우팅 규칙
def _intent_from_state(state: OptimizedStrategyState) -> str:
    """
    규칙 기반 의도 판별:
    1) 사용자가 next를 명시하면 그대로 따름
    2) 질의/키워드/입력 신호를 보고 guess
       - 행사/날씨/기간 키가 있으면 → situation
       - 콘텐츠/쿠폰/포스팅 키워드 or 분석팀 산출물만 있을 때 → content
       - 그 외 → strategy
    """
    # 1) 명시 라우팅 우선
    nxt = (state.get("next") or "").lower()
    if nxt in {"situation", "content", "strategy"}:
        return nxt

    # 2) 키워드/필드 기반 휴리스틱
    #   - messages 텍스트 스캔 
    text = ""
    msgs = state.get("messages") or []
    if msgs:
        last = msgs[-1]
        text = getattr(last, "content", "") or ""

    t = text.lower()
    has_period = bool(state.get("period_start") and state.get("period_end"))
    if ("날씨" in t or "비" in t or "우천" in t or "행사" in t or "이벤트" in t) or has_period:
        return "situation"

    if ("콘텐츠" in t or "포스트" in t or "쿠폰" in t or "캠페인" in t
        or state.get("analysis_report") or state.get("market_customer_analysis")
        or state.get("targeting_positioning")):
        return "content"

    return "strategy"


# Supervisor 노드(내용 없음; 조건 분기를 위한 NO-OP)
def _supervisor_node(state: OptimizedStrategyState):
    return {}  # 상태 변경 없이 다음 노드만 고름.


def build_graph() -> "CompiledGraph":
    """
    Entry: supervisor -> (intent router) -> {situation | content | strategy} -> END
    - supervisor: 의도 파악(규칙 기반)으로 첫 실행 노드 결정
    - situation: 이벤트/날씨 신호 수집 및 병합 (state['situation'])
    - content:   situation + 분석팀 산출물 바탕으로 content_plan 생성
    - strategy:  (있으면) 최종 전략 산출, 없으면 스텁이 summary만 생성
    """
    g = StateGraph(OptimizedStrategyState)

    # 1) 노드 등록
    g.add_node("supervisor", _supervisor_node)
    g.add_node("situation", situation_agent_node)
    g.add_node("content", content_agent_node)
    g.add_node("strategy", strategy_agent_node)

    # 2) 시작 노드
    g.set_entry_point("supervisor")

    # 3) supervisor에서 세 갈래 분기
    g.add_conditional_edges(
        "supervisor",
        _intent_from_state,
        {"situation": "situation", "content": "content", "strategy": "strategy"},
    )

    # 4) 각 에이전트 실행 후 종료(단발 실행 모델)
    g.add_edge("situation", END)
    g.add_edge("content", END)
    g.add_edge("strategy", END)

    return g.compile()



flowchart LR
  subgraph StateGraph<OptimizedStrategyState>
    EP(((ENTRY))) --> SUP[SUPERVISOR\n(_intent_from_state)]
    SUP -->|situation| SIT[SITUATION\n(situation_agent_node)]
    SUP -->|content| CON[CONTENT\n(content_agent_node)]
    SUP -->|strategy| STR[STRATEGY\n(strategy_agent_node or stub)]
    SIT --> END[(END)]
    CON --> END
    STR --> END
  end