In [None]:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langgraph.graph import StateGraph
from langgraph.graph import MessagesState
from langchain_core.messages import HumanMessage, SystemMessage
from typing import TypedDict, List, Optional
from datetime import datetime
import dotenv
import os

dotenv.load_dotenv()

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")



In [None]:
import json
from collections import defaultdict, deque
from typing import Dict, List, Optional, Tuple, TypedDict, cast
from datetime import datetime, timedelta


class SubtaskSpec(TypedDict, total=False):
    title: str
    duration_days: int
    summary: str


class TaskSpec(TypedDict, total=False):
    id: str
    title: str
    summary: str
    duration_days: int
    dependencies: List[str]
    subtasks: List[SubtaskSpec]


class TimelineEntry(TypedDict, total=False):
    id: str
    title: str
    start: str
    end: str
    duration_days: int
    dependencies: List[str]
    subtasks: List[SubtaskSpec]


class RoadmapState(TypedDict, total=False):
    # === 입력 기반 ===
    user_request: str                   # 사용자가 입력한 원문 요구사항
    user_goal: str                     # 사용자가 제시한 최종 목표 (예: "Flutter Todo 앱 완성")
    timeframe: str                     # 기간 단위 ('day', 'week', 'month')
    start_date: datetime               # 기준 날짜 (없으면 오늘)

    # === AI 처리 중간 단계 ===
    extracted_tasks: List[TaskSpec]    # 목표로부터 추출된 세부 작업 목록
    dependencies: List[Tuple[str, str]]# 작업 간 선후관계 (A -> B)
    timeline: List[TimelineEntry]      # 각 작업별 기간 [{task, start, end}]

    # === 최종 생성물 ===
    roadmap_text: str                  # 자연어 로드맵 요약
    roadmap_json: Dict                 # 구조화된 로드맵 (예: {day/week: [tasks]})

    # === 메타 정보 ===
    progress_notes: List[str]          # 중간 수정 내역 or 사용자 피드백
    status: str                        # "analyzing", "planning", "completed"


llm = ChatOpenAI(model="gpt-4o-mini", api_key=OPENAI_API_KEY, temperature=0)


In [None]:
requirements_prompt = ChatPromptTemplate.from_messages([
    ("system", """
    당신은 사용자의 요구사항으로부터 실행 가능한 프로젝트 로드맵을 작성하는 비서입니다.

    - 무조건 JSON 형식으로만 응답합니다.
    - JSON 최상위 키는 goal, timeframe, start_date, tasks 입니다.
    - timeframe 키는 {{"unit": "day|week|month", "span": 정수}} 형태로 제공합니다.
    - start_date는 YYYY-MM-DD 형식으로 작성합니다. 사용자가 제공하지 않으면 today 파라미터를 사용합니다.
    - tasks는 각 항목마다 {{"id", "title", "summary", "duration_days", "dependencies", "subtasks"}} 를 포함합니다.
    - subtasks는 각 항목마다 {{"title", "duration_days"}}를 가진 객체 목록이며, 2~4개의 간결한 한국어 표현으로 작성하고 duration_days 합은 상위 작업 duration_days와 일치하도록 만듭니다.
    - dependencies는 선행 작업 id 문자열 목록입니다. 없으면 빈 배열을 사용합니다.
    - duration_days는 1 이상의 정수입니다.
    - 사용자의 요청이 모호하면 가장 합리적인 가정을 명시적으로 JSON에 기록합니다.
"""),
    ("system", "오늘 날짜: {today}"),
    ("human", "{request}")
])
requirements_chain = requirements_prompt | llm | StrOutputParser()

summary_prompt = ChatPromptTemplate.from_messages([
    ("system", """
        당신은 프로젝트 매니저입니다. 주어진 로드맵 정보를 사용자 친화적으로 설명하세요.
        프로젝트가 언제 시작해 언제 끝나는지, 어떤 주요 단계를 거치는지, 각 단계가 시작되기 위해 무엇이 선행되어야 하는지 순서대로 설명해야 합니다.
        사용자가 프로젝트의 주요 흐름을 쉽게 이해할 수 있도록 풀어서 설명하며, 주요 단계에 대한 간단한 설명도 포함시켜야합니다.
        각 단계에서 수행할 주요 하위 작업도 짧게 언급하되, 문단 수는 4개 이하로 유지하세요.
"""),
    ("human", "최종 목표: {goal}\n기간 단위: {timeframe}\n기준 날짜: {start_date}\n작업 타임라인:\n{timeline}")
])
summary_chain = summary_prompt | llm | StrOutputParser()


def _strip_json_markdown(text: str) -> str:
    cleaned = text.strip()
    if cleaned.startswith("```"):
        parts = cleaned.split("```" )
        for part in parts:
            if part.strip().startswith("{"):
                return part.strip()
    return cleaned


def _load_json(text: str) -> Dict:
    try:
        return json.loads(_strip_json_markdown(text))
    except json.JSONDecodeError as exc:
        raise ValueError(f"JSON 파싱 실패: {exc} | 원본: {text[:200]}")


def _parse_date(value: Optional[str], default: datetime) -> datetime:
    if not value:
        return default
    candidates = ["%Y-%m-%d", "%Y.%m.%d", "%Y/%m/%d"]
    for fmt in candidates:
        try:
            return datetime.strptime(value, fmt)
        except ValueError:
            continue
    try:
        return datetime.fromisoformat(value)
    except ValueError as exc:
        raise ValueError(f"지원하지 않는 날짜 형식: {value}") from exc


def _normalise_tasks(tasks: List[Dict]) -> List[TaskSpec]:
    normalised: List[TaskSpec] = []
    for idx, raw in enumerate(tasks, start=1):
        task_id = raw.get("id") or f"T{idx:02d}"
        title = raw.get("title") or raw.get("name") or raw.get("summary") or f"작업 {idx}"
        summary = raw.get("summary") or raw.get("description") or title
        duration = raw.get("duration_days") or raw.get("duration") or 1
        try:
            duration_int = max(1, int(duration))
        except (TypeError, ValueError):
            duration_int = 1
        dependencies = raw.get("dependencies") or []
        if not isinstance(dependencies, list):
            dependencies = [str(dependencies)]
        dependencies = [str(dep) for dep in dependencies if dep]
        raw_subtasks = raw.get("subtasks") or []
        subtasks: List[SubtaskSpec] = []
        if isinstance(raw_subtasks, list):
            for sub in raw_subtasks:
                if isinstance(sub, dict):
                    title_value = str(sub.get("title") or sub.get("summary") or "세부 작업")
                    duration_value = sub.get("duration_days") or sub.get("duration") or 1
                else:
                    title_value = str(sub)
                    duration_value = 1
                try:
                    duration_int_sub = max(1, int(duration_value))
                except (TypeError, ValueError):
                    duration_int_sub = 1
                subtasks.append(
                    SubtaskSpec(
                        title=title_value.strip()[:15],
                        duration_days=duration_int_sub,
                        summary=title_value.strip(),
                    )
                )
        elif isinstance(raw_subtasks, dict):
            title_value = str(raw_subtasks.get("title") or "세부 작업")
            duration_value = raw_subtasks.get("duration_days") or 1
            try:
                duration_int_sub = max(1, int(duration_value))
            except (TypeError, ValueError):
                duration_int_sub = 1
            subtasks.append(
                SubtaskSpec(
                    title=title_value.strip()[:15],
                    duration_days=duration_int_sub,
                    summary=title_value.strip(),
                )
            )
        else:
            text = str(raw_subtasks).strip()
            if text:
                subtasks.append(
                    SubtaskSpec(title=text[:15], duration_days=1, summary=text)
                )

        if not subtasks:
            subtasks.append(
                SubtaskSpec(
                    title="세부 작업",
                    duration_days=duration_int,
                    summary="세부 작업",
                )
            )
        else:
            total_sub = sum(sub.get("duration_days", 0) for sub in subtasks)
            if total_sub <= 0:
                for sub in subtasks:
                    sub["duration_days"] = 1
                total_sub = len(subtasks)
            if total_sub != duration_int:
                diff = duration_int - total_sub
                subtasks[-1]["duration_days"] = max(1, subtasks[-1]["duration_days"] + diff)

        normalised.append(
            TaskSpec(
                id=str(task_id),
                title=str(title),
                summary=str(summary),
                duration_days=duration_int,
                dependencies=dependencies,
                subtasks=subtasks,
            )
        )
    return normalised


def _topological_order(tasks: List[TaskSpec]) -> List[str]:
    graph: Dict[str, List[str]] = defaultdict(list)
    indegree: Dict[str, int] = defaultdict(int)
    ids = {task["id"] for task in tasks}
    for task in tasks:
        indegree.setdefault(task["id"], 0)
        for dep in task.get("dependencies", []):
            if dep not in ids:
                # 존재하지 않는 선행 작업은 무시
                continue
            graph[dep].append(task["id"])
            indegree[task["id"]] += 1
    queue = deque([task_id for task_id in ids if indegree[task_id] == 0])
    ordering: List[str] = []
    while queue:
        current = queue.popleft()
        ordering.append(current)
        for nxt in graph.get(current, []):
            indegree[nxt] -= 1
            if indegree[nxt] == 0:
                queue.append(nxt)
    if len(ordering) != len(ids):
        raise ValueError("선후관계에 순환이 있어 로드맵을 만들 수 없습니다.")
    return ordering


def _build_periods(schedule: List[Dict], base_date: datetime, unit: str) -> List[Dict]:
    buckets: Dict[int, Dict] = {}
    for item in schedule:
        start_dt: datetime = item["start_dt"]
        end_dt: datetime = item["end_dt"]
        if unit == "day":
            index = (start_dt.date() - base_date.date()).days
            label = f"Day {index + 1} ({start_dt.date().isoformat()})"
        elif unit == "week":
            index = (start_dt.date() - base_date.date()).days // 7
            week_start = base_date + timedelta(days=index * 7)
            week_end = week_start + timedelta(days=6)
            label = f"Week {index + 1} ({week_start.date().isoformat()} ~ {week_end.date().isoformat()})"
        else:  # month
            index = (start_dt.year - base_date.year) * 12 + (start_dt.month - base_date.month)
            label = f"Month {index + 1} ({start_dt.strftime('%Y-%m')})"
        bucket = buckets.setdefault(
            index,
            {
                "label": label,
                "start": (start_dt.date()).isoformat(),
                "end": (end_dt.date()).isoformat(),
                "tasks": [],
            },
        )
        bucket["tasks"].append(
            {
                "id": item["id"],
                "title": item["title"],
                "start": item["start"],
                "end": item["end"],
                "duration_days": item["duration_days"],
                "dependencies": item["dependencies"],
                "subtasks": item.get("subtasks", []),
            }
        )
        # 기간의 시작/끝을 확장
        bucket["start"] = min(bucket["start"], item["start"])
        bucket["end"] = max(bucket["end"], item["end"])
    return [buckets[idx] for idx in sorted(buckets.keys())]


def extract_requirements(state: RoadmapState) -> RoadmapState:
    request = state.get("user_request")
    if not request:
        raise ValueError("user_request가 비어 있습니다. 사용자 입력을 제공해 주세요.")
    today = datetime.now().date().isoformat()
    raw_response = requirements_chain.invoke({"request": request, "today": today})
    parsed = _load_json(raw_response)

    timeframe_info = parsed.get("timeframe", {})
    timeframe_unit = str(timeframe_info.get("unit", "week")).lower()
    if timeframe_unit not in {"day", "week", "month"}:
        timeframe_unit = "week"

    user_goal = parsed.get("goal", request.strip())
    start_date = _parse_date(parsed.get("start_date"), datetime.now())
    tasks = _normalise_tasks(parsed.get("tasks", []))

    dependencies: List[Tuple[str, str]] = []
    for task in tasks:
        for dep in task.get("dependencies", []):
            dependencies.append((dep, task["id"]))

    new_state = dict(state)
    new_state.update(
        {
            "status": "analyzing",
            "user_request": request,
            "user_goal": user_goal,
            "timeframe": timeframe_unit,
            "start_date": start_date,
            "extracted_tasks": tasks,
            "dependencies": dependencies,
            "progress_notes": state.get("progress_notes", []),
        }
    )
    return cast(RoadmapState, new_state)


def build_schedule(state: RoadmapState) -> RoadmapState:
    tasks = state.get("extracted_tasks", [])
    if not tasks:
        raise ValueError("추출된 작업이 없습니다. 선행 단계 결과를 확인하세요.")

    base_date = state.get("start_date") or datetime.now()
    order = _topological_order(tasks)

    task_map: Dict[str, TaskSpec] = {task["id"]: task for task in tasks}
    completion: Dict[str, Dict[str, datetime]] = {}
    schedule: List[Dict] = []
    cursor = base_date

    for task_id in order:
        task = task_map[task_id]
        deps = [dep for dep in task.get("dependencies", []) if dep in completion]
        if deps:
            start_dt = max(completion[dep]["end"] + timedelta(days=1) for dep in deps)
        else:
            start_dt = cursor
        start_dt = max(start_dt, cursor)
        duration = task.get("duration_days", 1)
        end_dt = start_dt + timedelta(days=duration - 1)

        completion[task_id] = {"start": start_dt, "end": end_dt}
        cursor = end_dt + timedelta(days=1)

        schedule.append(
            {
                "id": task_id,
                "title": task.get("title", task_id),
                "summary": task.get("summary", ""),
                "start_dt": start_dt,
                "end_dt": end_dt,
                "start": start_dt.date().isoformat(),
                "end": end_dt.date().isoformat(),
                "duration_days": duration,
                "dependencies": deps,
                "subtasks": task.get("subtasks", []),
            }
        )

    timeline_entries: List[TimelineEntry] = [
        TimelineEntry(
            id=item["id"],
            title=item["title"],
            start=item["start"],
            end=item["end"],
            duration_days=item["duration_days"],
            dependencies=item["dependencies"],
            subtasks=item.get("subtasks", []),
        )
        for item in schedule
    ]

    roadmap_json = {
        "goal": state.get("user_goal"),
        "timeframe_unit": state.get("timeframe"),
        "start_date": base_date.date().isoformat(),
        "periods": _build_periods(schedule, base_date, state.get("timeframe", "week")),
    }

    new_state = dict(state)
    new_state.update(
        {
            "status": "planning",
            "timeline": timeline_entries,
            "roadmap_json": roadmap_json,
        }
    )
    return cast(RoadmapState, new_state)


def summarise_roadmap(state: RoadmapState) -> RoadmapState:
    timeline_lines = []
    for item in state.get("timeline", []):
        deps = ", ".join(item.get("dependencies", [])) or "선행 없음"
        subtasks = item.get("subtasks", []) or []
        subtasks_str = "; ".join(
            f"{sub.get('title', '')}({sub.get('duration_days', 1)}일)" for sub in subtasks
        ) or "세부 작업 없음"
        line = (
            f"- {item['title']} ({item['start']} ~ {item['end']}, {item['duration_days']}일, 선행: {deps})\n"
            f"  · 세부: {subtasks_str}"
        )
        timeline_lines.append(line)
    summary_input = {
        "goal": state.get("user_goal", ""),
        "timeframe": state.get("timeframe", ""),
        "start_date": state.get("start_date", datetime.now()).date().isoformat(),
        "timeline": "\n".join(timeline_lines),
    }
    roadmap_text = summary_chain.invoke(summary_input)

    new_state = dict(state)
    new_state.update(
        {
            "status": "completed",
            "roadmap_text": roadmap_text.strip(),
        }
    )
    return cast(RoadmapState, new_state)



In [42]:
roadmap_graph_builder = StateGraph(RoadmapState)
roadmap_graph_builder.add_node("extract_requirements", extract_requirements)
roadmap_graph_builder.add_node("build_schedule", build_schedule)
roadmap_graph_builder.add_node("summarise", summarise_roadmap)

roadmap_graph_builder.set_entry_point("extract_requirements")
roadmap_graph_builder.add_edge("extract_requirements", "build_schedule")
roadmap_graph_builder.add_edge("build_schedule", "summarise")
roadmap_graph_builder.set_finish_point("summarise")

roadmap_graph = roadmap_graph_builder.compile()


def generate_roadmap(user_request: str, progress_notes: Optional[List[str]] = None) -> RoadmapState:
    """사용자 요구를 받아 LangGraph 로드맵 파이프라인을 실행한다."""
    initial_state: RoadmapState = {
        "user_request": user_request,
        "progress_notes": progress_notes or [],
    }
    return roadmap_graph.invoke(initial_state)



In [43]:
# 사용 예시
sample_request = """
나는 운동 기록 앱을 만들고 싶어. 한 3주 정도를 잡고 만들어보고 싶은데 로드맵을 만들어줘.
"""

# 아래 셀을 실행하면 LangGraph 파이프라인이 로드맵을 생성합니다.
generate_roadmap(sample_request)



{'user_request': '\n나는 운동 기록 앱을 만들고 싶어. 한 3주 정도를 잡고 만들어보고 싶은데 로드맵을 만들어줘.\n',
 'user_goal': '운동 기록 앱 개발',
 'timeframe': 'week',
 'start_date': datetime.datetime(2025, 11, 10, 0, 0),
 'extracted_tasks': [{'id': '1',
   'title': '기획 및 요구사항 정의',
   'summary': '앱의 기능과 디자인을 구상하고 요구사항을 정리합니다.',
   'duration_days': 7,
   'dependencies': [],
   'subtasks': [{'title': '기능 목록 작성',
     'duration_days': 3,
     'summary': '기능 목록 작성'},
    {'title': 'UI/UX 디자인 구상', 'duration_days': 2, 'summary': 'UI/UX 디자인 구상'},
    {'title': '요구사항 문서화', 'duration_days': 2, 'summary': '요구사항 문서화'}]},
  {'id': '2',
   'title': '개발 환경 설정',
   'summary': '개발에 필요한 도구와 환경을 설정합니다.',
   'duration_days': 3,
   'dependencies': ['1'],
   'subtasks': [{'title': 'IDE 설치', 'duration_days': 1, 'summary': 'IDE 설치'},
    {'title': '프로젝트 구조 설정', 'duration_days': 1, 'summary': '프로젝트 구조 설정'},
    {'title': '필요 라이브러리 설치', 'duration_days': 1, 'summary': '필요 라이브러리 설치'}]},
  {'id': '3',
   'title': '기본 기능 개발',
   'summary': '운동 기록의 기본 기능을