### 설치된 라이브러리 버전 확인
- **API 차이로 막히지 않도록 현재 설치 버전 명시**
- **무엇**: `importlib.metadata.version()`로 `langgraph`, `openai`, `typing_extensions` 출력
- **관찰**: 버전 문자열이 정상 출력되면 OK

In [None]:
import sys, platform
from importlib.metadata import version, PackageNotFoundError

def get_ver(pkg: str) -> str:
    try:
        return version(pkg)
    except PackageNotFoundError:
        return "not installed"

print("Python:", sys.version.split()[0])
print("OS:", platform.platform())
print("langgraph:", get_ver("langgraph"))
print("openai:", get_ver("openai"))
print("typing_extensions:", get_ver("typing_extensions"))

### LM Studio 로컬 서버 연결
- **OpenAI SDK 인터페이스 유지 + 로컬 호스팅 모델 사용**
- **무엇**:
  - `OpenAI(base_url="http://localhost:1234/v1", api_key="lm-studio")`
  - `client.models.list()`로 연결 확인
  - `chat_local(prompt)` 보조 함수 정의
- **관찰**: 모델 리스트 보이고 `chat_local("hi")`가 응답
- **이슈**: 연결 실패 시 LM Studio **Server 탭** 실행/포트 확인

#### LM Studio 모델 리스트 확인

In [None]:
from openai import OpenAI

# LM Studio 엔드포인트와 임의 API 키
client = OpenAI(
    base_url="http://localhost:1234/v1",
    api_key="lm-studio"  # 임의 문자열 가능
)

# 간단한 헬스체크
models = client.models.list()
print("Models:", [m.id for m in models.data])

def chat_local(prompt: str, model: str = None, temperature: float = 0.2) -> str:
    """LM Studio 호환 Chat Completions 호출"""
    model = model or (models.data[0].id if models.data else "meta-llama-3.1-8b-instruct")
    resp = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        temperature=temperature,
    )
    return resp.choices[0].message.content.strip()


### OpenAI API 사용

In [None]:
# 1) 루트 폴더에 .env 파일을 만들고 아래처럼 키를 적어둡니다.
#    OPENAI_API_KEY=sk-********************************
# 2) 필요시 설치: uv add python-dotenv openai

import os
from dotenv import load_dotenv
from openai import OpenAI

# .env 로딩
_ = load_dotenv()  # 기본적으로 현재 작업 디렉터리의 .env 탐색

# 환경변수에서 키 읽기 (dotenv가 주입)
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
    raise RuntimeError(
        "환경변수 OPENAI_API_KEY가 없습니다. 프로젝트 루트에 .env 파일을 만들고 "
        "OPENAI_API_KEY=... 값을 넣은 뒤 다시 실행하세요."
    )

# OpenAI 기본 클라이언트 (공식 엔드포인트 사용, base_url 불필요)
client = OpenAI(api_key=api_key)

# 사용할 모델 (필요시 변경)
DEFAULT_MODEL = "gpt-4o-mini"

def chat_local(prompt: str, model: str = None, temperature: float = 0.2) -> str:
    """
    기존 노트북의 chat_local 함수를 OpenAI API 호출로 대체합니다.
    다른 셀 코드는 수정 없이 그대로 사용 가능합니다.
    """
    model = model or DEFAULT_MODEL
    resp = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        temperature=temperature,
    )
    return (resp.choices[0].message.content or "").strip()

# 스모크 테스트
print(chat_local("한 줄로 인사해줘"))


### Hello, Graph
- **LangGraph 모델(상태→노드→엣지) 파악**
- **무엇**:
  - 상태 `{"input","output"}`
  - `echo_node` 하나 + `START→echo→END`
- **관찰**: `output`에 `Echo: ...` 반환

In [None]:
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

In [None]:
class S1(TypedDict):
    input: str      # key나 값을 입력하는 것이 아닌 어떤 데이터가 들어갈것인지 전달
    output: str

g1 = StateGraph(S1)

In [None]:
# 노드에서 사용할 에이전트
def echo_node(state: S1) -> S1:
    return {    # return 값은 state를 업데이트!
        "output": f"Echo: {state['input']}"
    } 

In [None]:
# 노드 생성
g1.add_node("echo", echo_node)

# 엣지 연결 생성
g1.add_edge(START, "echo")
g1.add_edge("echo", END)

In [None]:
# Graph Compile -> 유효성 검사
app1 = g1.compile()

app1 # 그래프 출력

In [None]:
# invoke로 값 입력 후 실행
app1.invoke({"input": "LangGraph 첫 그래프!"})

### LLM node 추가
- **LLM 호출을 그래프 **노드로 캡슐화**하는 법 익히기**
- **무엇**:
  - 상태 `{"question","answer"}`
  - `llm_node` 내부에서 `chat_local()` 호출
  - `START→llm→END`
- **관찰**: `answer`에 한 줄 답 도착

In [None]:
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

In [None]:
# State 정의
class S2(TypedDict):
    question: str
    answer: str

g2 = StateGraph(S2)

In [None]:
# 에이전트 정의
def echo_node(state: S2) -> S2:
    print("Echo Node ->", state)
    return {    
        "question": f"Echo: {state['question']}"
    } 

def llm_node(state: S2) -> S2:
    ans = chat_local(f"한 문장으로 답해줘: {state['question']}")
    print("LLM Node ->", state)
    return {"answer": ans}

In [None]:
# 노드 추가
g2.add_node("echo_node", echo_node)
g2.add_node("llm_node", llm_node)

# 엣지 연결 추가
g2.add_edge(START, "llm_node")
g2.add_edge("llm_node", "echo_node")
g2.add_edge("echo_node", END)
app2 = g2.compile()

In [None]:
# 실행!
app2.invoke({"question": "LangGraph는 무엇인가?"})

In [None]:
app2

### Conditional Edge 
- **간단한 분기 생성**
- **입력 유효성 검사 및 흐름 제어**
- **무엇**:
  - 상태 `{"question","result"}`
  - `route_fn`으로 빈 입력 시 종료, 아니면 처리
  - `add_conditional_edges("validate", route_fn, {"empty": END, "ok": "answer"})`
- **관찰**: 빈 입력은 종료, 유효 입력은 답변 노드로 이동

In [None]:
from typing_extensions import TypedDict

class S3(TypedDict):
    question: str
    result: str

def validate(state: S3) -> S3:
    q = (state.get("question") or "").strip()
    return {"result": "빈 질문"}

def answer(state: S3) -> S3:
    q = state["question"]
    a = chat_local(f"간단히 답해줘: {q}")
    return {"result": a}

def route_fn(state: S3) -> str:
    if not (state.get("question") or "").strip():
        return "empty"
    return "ok"

g3 = StateGraph(S3)
g3.add_node("validate", validate)
g3.add_node("answer", answer)
g3.add_edge(START, "validate")
g3.add_conditional_edges("validate", route_fn, {"empty": END, "ok": "answer"})
g3.add_edge("answer", END)
app3 = g3.compile()

print(app3.invoke({"question": ""}))
print(app3.invoke({"question": "Conditional Edge는 언제 쓰나?"}))

### Reducer(상태 합치기)
- **여러 노드 결과를 **한 키(리스트)**로 안전히 누적**
- **무엇**:
  - 상태에 `facts: Annotated[list[str], operator.add]`
  - `fact_a`, `fact_b`가 각각 한 문장 생성 → 리스트 병합
- **관찰**: `facts`에 2개 이상 항목이 합쳐져 반환
- **이슈**: `reducer` 미지정 시 덮어쓰기 발생

In [None]:
from typing_extensions import Annotated, TypedDict
import operator

class S4(TypedDict):
    topic: str
    facts: Annotated[list[str], operator.add]  # 리스트를 합치는 규칙

def fact_a(state: S4) -> S4:
    t = state["topic"]
    return {"facts": [chat_local(f"{t}에 대해 핵심 사실 1가지를 1문장으로.", temperature=0.1)]}

def fact_b(state: S4) -> S4:
    t = state["topic"]
    return {"facts": [chat_local(f"{t}에 대해 추가 사실 1가지를 1문장으로.", temperature=0.1)]}

g4 = StateGraph(S4)
g4.add_node("fact_a", fact_a)
g4.add_node("fact_b", fact_b)
g4.add_edge(START, "fact_a")
g4.add_edge("fact_a", "fact_b")
g4.add_edge("fact_b", END)
app4 = g4.compile()

app4.invoke({"topic": "LangGraph"})

### 체크포인트 & 스트리밍
- **실행 추적/디버깅/재실행(thread)**
- **무엇**:
  - `MemorySaver` checkpointer
  - 같은 `thread_id`로 `app.stream(..., stream_mode="values")` 두 번 실행
- **관찰**: 중간 상태가 흘러나오고, 재실행도 로그가 보임
- **이슈**: `stream_mode="values"`와 `thread_id` 누락 주의

In [None]:
from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()

# 이전 상태 재사용 가능한 간단 그래프
class S5(TypedDict):
    text: str
    steps: Annotated[list[str], operator.add]

def step1(state: S5) -> S5:
    return {"steps": [f"입력 길이: {len(state['text'])}"]}

def step2(state: S5) -> S5:
    s = chat_local(f"요약: {state['text']}")
    return {"steps": [f"요약완료({len(s)}자)"]}

g5 = StateGraph(S5)
g5.add_node("step1", step1)
g5.add_node("step2", step2)
g5.add_edge(START, "step1")
g5.add_edge("step1", "step2")
g5.add_edge("step2", END)
app5 = g5.compile(checkpointer=checkpointer)

thread = {"configurable": {"thread_id": "demo-1"}}

print("== 1차 실행 ==")
for update in app5.stream({"text": "LangGraph는 상태+노드+엣지로 구성된 워크플로우 엔진."}, thread, stream_mode="values"):
    print(update)

print("\n== 2차 실행(같은 thread_id) ==")
for update in app5.stream({"text": "같은 스레드에서 두 번째 실행"}, thread, stream_mode="values"):
    print(update)


### Send / Command
- 동적 fan-out
- 한 개 입력에 포함된 여러 sub-question을 계획 -> 병렬 답변 -> 모으기 흐름으로 처리
- 하나의 입력을 **여러 하위 작업**(질문들)으로 병렬 처리
- **무엇**:
  - 상태:  
    - `questions: Annotated[list[str], operator.add]`  
    - `answers:   Annotated[list[str], operator.add]`
  - `plan` 노드: raw 텍스트를 `?` 기준으로 분리해 `questions`에 저장
  - **핵심**: `add_conditional_edges("plan", fanout)`에서 `fanout`이  
    `return [Send("qa", {"question": q}) for q in state["questions"]]` 반환
  - `qa` 노드: `state["question"]`를 읽고 답 생성 → `answers`에 1개씩 추가
  - 엣지: `START→plan`, `qa→join`, `join→END`
- **관찰**:
  - 스트리밍: `{'questions': [...]}` 후 `{'answers': ['Q: ... | A: ...']}`가 **여러 번** 출력
  - 최종 상태에서 모든 Q/A가 `answers`에 누적
- **이슈**:
  - `answers`가 비면  
    ① 리듀서 누락(반드시 `Annotated[..., operator.add]`)  
    ② 팬아웃 미실행(조건 엣지/`Send` 반환 확인)  
    ③ `qa`에서 `question` 접근 방식 확인(`state["question"]`)

In [None]:
from typing_extensions import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
import operator

# 1) 상태: 리스트 병합 규칙 꼭 지정
class S(TypedDict):
    raw: str
    questions: Annotated[list[str], operator.add]
    answers:   Annotated[list[str], operator.add]

# 2) plan: 질문 리스트만 상태에 기록
def plan(state: S):
    qs = [p.strip()+"?" for p in state["raw"].split("?") if p.strip()]
    return {"questions": qs}

# 3) fanout(조건 엣지 함수): qa로 Send 목록 반환  ← 핵심!
def fanout(state: S):
    return [Send("qa", {"question": q}) for q in state["questions"]]

# 4) qa: Send로 넘어온 입력(arg)에 'question'이 담겨옴
def qa(state: S) -> S:
    q = state.get("question", "")
    a = chat_local(f"아주 짧게 대답해줘: {q}", temperature=0.1) or ""
    return {"answers": [f"Q: {q} | A: {a.strip()}"]}

# 5) join: 마무리(패스스루)
def join(state: S) -> S:
    return {}

# 6) 그래프 구성
g = StateGraph(S)
g.add_node("plan", plan)
g.add_node("qa", qa)
g.add_node("join", join)

g.add_edge(START, "plan")
# plan 다음 흐름을 '조건 엣지'로 정의 → qa가 질문 수만큼 실행됨
g.add_conditional_edges("plan", fanout)
# qa 실행이 끝난 뒤 join으로 수집
g.add_edge("qa", "join")
g.add_edge("join", END)

app = g.compile()

# 7) 실행
out = app.invoke({"raw": "LangGraph는 무엇인가? Conditional Edge는 언제 쓰나? Reducer는 왜 필요한가?"})
out


---

팬아웃(fan-out)은 **하나의 단계(노드)에서 여러 개의 하위 작업을 동시에(또는 여러 번) 흩뿌려 실행**하는 패턴  
“**하나 들어오면 → N개로 나눠서 → 각각 돌리고 → 다시 모은다**”

---

## 사용 이유

* **평행 작업**: 여러 질문/문서/URL을 **동시에** 처리하여 속도 상승
* **분리 정복**: 큰 문제를 **작은 하위 문제**들로 나눠 고품질 답변 획득
* **구조적 안전성**: 각 하위 작업을 독립 노드로 캡슐화(실패, 재시도, 로그 추적 쉬움)

---

## LangGraph에서 사용 방법

1. **fan-out(흩뿌리기)**

   * `Conditional Edge` 함수에서 `Send`들의 **리스트**를 반환
     → 런타임이 `qa` 노드를 **질문 수만큼** 실행
2. **fan-in(모으기 / join)**

   * 각 `qa`가 상태에 결과를 적고(예: `answers`),
   * **리듀서**(`Annotated[list[str], operator.add]`) 규칙으로 자동 병합
   * 모두 끝나면 `join` 노드로 흐름이 이어짐


### **Flow**

```
           ┌───────────┐
input ───▶ │  plan     │─── fan-out ──┐
           └───────────┘              │
                 │                    │
        [Send("qa", q1), ...]         │
                 │                    ▼
           ┌───────────┐       ┌───────────┐
           │   qa(q1)  │  ...  │   qa(qN)  │
           └───────────┘       └───────────┘
                 \______________________/ 
                          │  fan-in (리듀서로 병합)
                          ▼
                    ┌───────────┐
                    │   join    │
                    └───────────┘
```

---


## 활용 방안

* **여러 질문/프롬프트**를 한 번에 답할 때
* **문서/URL 배치 처리**(요약, 추출, 분류)
* **검색 결과 다건 스코어링** 후 상위 N개 선별

---

## 실전 팁

* **`reducer` 필수**: 리스트/딕셔너리로 결과를 모을 땐
  `Annotated[list[T], operator.add]`처럼 병합 규칙을 반드시 지정
* **입력 크기 제어**: 하위 작업이 너무 많으면 rate limit에 걸릴 수 있어 **batch 크기**나 **동시성**을 제한
* **결정성**: 병렬 실행 순서는 미보장일 수 있어 **정렬 키**(예: 원래 인덱스) 보관 권장
* **에러 처리**: 각 `qa`에서 예외를 잡아 **에러 메시지도 결과로** 남기면 디버깅 용이

---

## 한 줄 정의

> **Fan-out/Fan-in**: 하나의 입력을 여러 하위 작업으로 **동시에 실행**하고, **reducer 규칙**으로 결과를 **합쳐** 다음 단계로 넘기는 그래프 패턴


