# **Langfuse 모니터링**: LLM Observability

- Langfuse는 **LLM 애플리케이션의 관찰성(Observability)** 옵션을 제공하는 오픈소스 도구임.

- 개발자가 LLM 체인과 에이전트를 **효과적으로 디버깅**하고 모니터링할 수 있도록 지원함.

- 주요 기능으로는 체인 실행 **로깅 및 추적**이 포함됨.

- **프롬프트 디버깅**과 성능 측정 및 분석 기능을 제공함.

### Langfuse 환경 설정

📦 계정 가입 및 설정

- Langfuse 가입 필요 (https://cloud.langfuse.com) 또는 셀프 호스팅
- 오픈소스 (무료) 및 클라우드 버전 (일부 유료)

- .env 파일에 환경 변수 설정
    ```
    LANGFUSE_ENABLED=true
    LANGFUSE_HOST=your_langfuse_host_url
    LANGFUSE_SECRET_KEY=your_secret_key
    LANGFUSE_PUBLIC_KEY=your_public_key
    ```

In [142]:
from dotenv import load_dotenv
load_dotenv()

# Langfuse 초기화 및 설정 확인
import os

print("Langfuse 설정 확인:")
print(f"LANGFUSE_ENABLED: {os.getenv('LANGFUSE_ENABLED')}")
print(f"LANGFUSE_HOST: {os.getenv('LANGFUSE_HOST')}")
print(f"PUBLIC_KEY 존재 여부: {'Yes' if os.getenv('LANGFUSE_PUBLIC_KEY') else 'No'}")
print(f"SECRET_KEY 존재 여부: {'Yes' if os.getenv('LANGFUSE_SECRET_KEY') else 'No'}")

# Langfuse 클라이언트 초기화 (init_langfuse.py 방식 참고)
try:
    from langfuse import Langfuse
    from langfuse.langchain import CallbackHandler
    
    # 명시적 파라미터로 클라이언트 생성
    langfuse_client = Langfuse(
        public_key=os.getenv('LANGFUSE_PUBLIC_KEY'),
        secret_key=os.getenv('LANGFUSE_SECRET_KEY'),
        host=os.getenv('LANGFUSE_HOST')
    )
    
    print(f"\n✅ Langfuse 클라이언트 초기화 완료")
    
    # CallbackHandler 여러 방법으로 시도 (init_langfuse.py와 동일한 방식)
    langfuse_handler = None
    
    try:
        # 방법 1: 기본 초기화 시도
        langfuse_handler = CallbackHandler()
        print("✅ CallbackHandler 초기화 성공 (기본)")
    except Exception as e1:
        try:
            # 방법 2: client 매개변수 사용
            langfuse_handler = CallbackHandler(client=langfuse_client)
            print("✅ CallbackHandler 초기화 성공 (client)")
        except Exception as e2:
            try:
                # 방법 3: 직접 매개변수 전달
                langfuse_handler = CallbackHandler(
                    public_key=os.getenv('LANGFUSE_PUBLIC_KEY'),
                    secret_key=os.getenv('LANGFUSE_SECRET_KEY'),
                    host=os.getenv('LANGFUSE_HOST')
                )
                print("✅ CallbackHandler 초기화 성공 (직접)")
            except Exception as e3:
                print(f"❌ CallbackHandler 초기화 실패: {e1}, {e2}, {e3}")
                langfuse_handler = None
    
    # 전역 변수로 저장
    globals()['langfuse_handler'] = langfuse_handler
    globals()['langfuse_client'] = langfuse_client
    
except Exception as e:
    print(f"\n❌ Langfuse 초기화 실패: {e}")
    globals()['langfuse_handler'] = None
    globals()['langfuse_client'] = None

Langfuse 설정 확인:
LANGFUSE_ENABLED: true
LANGFUSE_HOST: http://reai.kro.kr:3000
PUBLIC_KEY 존재 여부: Yes
SECRET_KEY 존재 여부: Yes

✅ Langfuse 클라이언트 초기화 완료
✅ CallbackHandler 초기화 성공 (기본)


In [143]:
# Langfuse 연결 상태 및 추적 테스트 (개선된 버전)
import requests
import json
from datetime import datetime

print("🔍 Langfuse 연결 및 추적 상태 진단")
print("=" * 50)

# 1. 서버 연결 테스트
try:
    host = os.getenv('LANGFUSE_HOST')
    response = requests.get(f"{host}/api/public/health", timeout=10)
    print(f"✅ 서버 연결: {response.status_code} - {host}")
except Exception as e:
    print(f"❌ 서버 연결 실패: {e}")

# 2. 인증 정보 확인
public_key = os.getenv('LANGFUSE_PUBLIC_KEY')
secret_key = os.getenv('LANGFUSE_SECRET_KEY')

if public_key and secret_key:
    print(f"✅ 인증 정보: PUBLIC_KEY({public_key[:10]}...), SECRET_KEY({secret_key[:10]}...)")
else:
    print("❌ 인증 정보 누락")

# 3. 버전별 API 호출 테스트
try:
    from langfuse import Langfuse
    
    # 클라이언트 생성
    client = Langfuse(
        public_key=public_key,
        secret_key=secret_key,
        host=host
    )
    
    # 새로운 API 방식으로 트레이스 생성
    try:
        trace = client.create_trace(
            name="direct_api_test",
            metadata={"test_time": datetime.now().isoformat()}
        )
        print(f"✅ create_trace API 호출 성공")
        print(f"📍 트레이스 ID: {trace.id}")
        print(f"🔗 웹에서 확인: {host}/trace/{trace.id}")
    except Exception as e:
        print(f"❌ create_trace 실패: {e}")
        
        # 구버전 방식 시도
        try:
            trace = client.trace(
                name="direct_api_test_old",
                metadata={"test_time": datetime.now().isoformat()}
            )
            print(f"✅ 구버전 trace API 호출 성공")
            print(f"📍 트레이스 ID: {trace.id}")
        except Exception as e2:
            print(f"❌ 구버전 trace도 실패: {e2}")
    
except Exception as e:
    print(f"❌ Langfuse 클라이언트 생성 실패: {e}")

# 4. CallbackHandler 테스트
try:
    from langfuse.langchain import CallbackHandler
    
    handler = CallbackHandler(
        public_key=public_key,
        secret_key=secret_key,
        host=host,
        debug=True
    )
    print(f"✅ CallbackHandler 초기화 성공")
    print(f"🎯 LangChain을 통한 추적이 가능합니다")
    
except Exception as e:
    print(f"❌ CallbackHandler 초기화 실패: {e}")

print("\n" + "=" * 50)
print("진단 완료! CallbackHandler가 작동한다면 LLM 호출시 추적됩니다.")

🔍 Langfuse 연결 및 추적 상태 진단
✅ 서버 연결: 200 - http://reai.kro.kr:3000
✅ 인증 정보: PUBLIC_KEY(pk-lf-b247...), SECRET_KEY(sk-lf-ce56...)
❌ create_trace 실패: 'Langfuse' object has no attribute 'create_trace'
❌ 구버전 trace도 실패: 'Langfuse' object has no attribute 'trace'
❌ CallbackHandler 초기화 실패: LangchainCallbackHandler.__init__() got an unexpected keyword argument 'secret_key'

진단 완료! CallbackHandler가 작동한다면 LLM 호출시 추적됩니다.


---

# LCEL(LangChain Expression Language) 

- **LCEL**은 `|` 연산자를 사용해 컴포넌트들을 순차적으로 연결하는 선언적 체이닝을 지원합니다

- **재사용성**이 높아 정의된 체인을 다른 체인의 컴포넌트로 활용할 수 있습니다

- **다양한 실행 방식**(.invoke(), .batch(), .stream(), .astream())으로 동기/비동기 처리가 가능합니다

- **배치 처리**시 자동 최적화를 통해 효율적인 작업 수행이 가능합니다


#### 1) **Prompt + LLM**

* 기본 구조: `Prompt | LLM` 형태로, 파이프(|) 연산자를 사용해 프롬프트와 LLM을 순차적으로 연결합니다.

* 데이터 흐름: 사용자 입력이 Prompt 템플릿을 통해 처리된 후, LLM에 전달되어 최종 응답이 생성됩니다.

* 실행 순서: 파이프라인은 왼쪽에서 오른쪽으로 순차적으로 실행되며, 각 컴포넌트의 출력이 다음 컴포넌트의 입력으로 전달됩니다.

In [144]:
from langchain_openai import ChatOpenAI
import os

# 전역에서 langfuse_handler 가져오기
try:
    langfuse_handler = globals().get('langfuse_handler')
    if langfuse_handler:
        print("✅ 저장된 Langfuse CallbackHandler 사용")
    else:
        print("❌ Langfuse CallbackHandler가 없습니다. 추적 없이 진행합니다.")
except:
    print("❌ Langfuse CallbackHandler 로드 실패. 추적 없이 진행합니다.")
    langfuse_handler = None

# LLM model with Langfuse tracking
if langfuse_handler:
    llm = ChatOpenAI(
        model="gpt-4o-mini", 
        temperature=0.3, 
        top_p=0.95,
        callbacks=[langfuse_handler]
    )
    print("🎯 Langfuse 추적이 활성화된 LLM 생성 완료")
else:
    llm = ChatOpenAI(
        model="gpt-4o-mini", 
        temperature=0.3, 
        top_p=0.95
    )
    print("⚪ 추적 없는 LLM 생성 완료")

# 모델에 프롬프트를 입력 (init_langfuse.py 방식처럼 config 사용)
print("\n📝 LLM 호출 시작...")
try:
    if langfuse_handler:
        response = llm.invoke(
            "탄소의 원자 번호는 무엇인가요?",
            config={"callbacks": [langfuse_handler]}
        )
    else:
        response = llm.invoke("탄소의 원자 번호는 무엇인가요?")
        
    print("✅ LLM 호출 완료")
    print(f"📋 응답: {response.content}")
    
    if langfuse_handler:
        print(f"\n🔍 Langfuse 웹에서 확인: {os.getenv('LANGFUSE_HOST')}/traces")
        print("💡 몇 초 후 웹 인터페이스에 트레이스가 나타납니다.")
        
except Exception as e:
    print(f"❌ LLM 호출 실패: {e}")

# 전역 변수로 저장
globals()['llm'] = llm

✅ 저장된 Langfuse CallbackHandler 사용
🎯 Langfuse 추적이 활성화된 LLM 생성 완료

📝 LLM 호출 시작...
✅ LLM 호출 완료
📋 응답: 탄소의 원자 번호는 6입니다. 이는 탄소 원자가 6개의 양성자를 가지고 있음을 의미합니다.

🔍 Langfuse 웹에서 확인: http://reai.kro.kr:3000/traces
💡 몇 초 후 웹 인터페이스에 트레이스가 나타납니다.
✅ LLM 호출 완료
📋 응답: 탄소의 원자 번호는 6입니다. 이는 탄소 원자가 6개의 양성자를 가지고 있음을 의미합니다.

🔍 Langfuse 웹에서 확인: http://reai.kro.kr:3000/traces
💡 몇 초 후 웹 인터페이스에 트레이스가 나타납니다.


In [145]:
# 응답 객체 확인
response

AIMessage(content='탄소의 원자 번호는 6입니다. 이는 탄소 원자가 6개의 양성자를 가지고 있음을 의미합니다.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 28, 'prompt_tokens': 18, 'total_tokens': 46, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_560af6e559', 'id': 'chatcmpl-C9tCpKusW5NhT2XlbahHyp3GEm3V0', 'service_tier': 'default', 'finish_reason': 'stop', 'logprobs': None}, id='run--a2dd77cc-676e-424d-88f8-380ef037ed4a-0', usage_metadata={'input_tokens': 18, 'output_tokens': 28, 'total_tokens': 46, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})

In [146]:
# 응답 객체의 텍스트를 확인
response.content

'탄소의 원자 번호는 6입니다. 이는 탄소 원자가 6개의 양성자를 가지고 있음을 의미합니다.'

In [147]:
# 응답 객체의 메타데이터를 확인
response.response_metadata

{'token_usage': {'completion_tokens': 28,
  'prompt_tokens': 18,
  'total_tokens': 46,
  'completion_tokens_details': {'accepted_prediction_tokens': 0,
   'audio_tokens': 0,
   'reasoning_tokens': 0,
   'rejected_prediction_tokens': 0},
  'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}},
 'model_name': 'gpt-4o-mini-2024-07-18',
 'system_fingerprint': 'fp_560af6e559',
 'id': 'chatcmpl-C9tCpKusW5NhT2XlbahHyp3GEm3V0',
 'service_tier': 'default',
 'finish_reason': 'stop',
 'logprobs': None}

**Langfuse에서 추적 결과 확인**

- Langfuse 웹 인터페이스에서 LLM 호출이 추적됨
- 프롬프트, 응답, 메타데이터가 모두 기록됨
- 성능 및 비용 분석 가능

In [148]:
from langchain_core.prompts import PromptTemplate

# 템플릿 문자열 정의
template = """
당신은 {topic} 분야의 전문가입니다. {topic}에 관한 다음 질문에 답변해주세요.
질문: {question}
답변: """

# PromptTemplate 객체 생성
prompt = PromptTemplate.from_template(template)

print("템플릿 변수:")
print(f"- 필수 변수: {prompt.input_variables}")

템플릿 변수:
- 필수 변수: ['question', 'topic']


In [149]:
# chain을 구성
chain = prompt | llm

# chain 객체 확인
chain

PromptTemplate(input_variables=['question', 'topic'], input_types={}, partial_variables={}, template='\n당신은 {topic} 분야의 전문가입니다. {topic}에 관한 다음 질문에 답변해주세요.\n질문: {question}\n답변: ')
| ChatOpenAI(callbacks=[<langfuse.langchain.CallbackHandler.LangchainCallbackHandler object at 0x000001DEEC73D3D0>], client=<openai.resources.chat.completions.completions.Completions object at 0x000001DEEC4FD790>, async_client=<openai.resources.chat.completions.completions.AsyncCompletions object at 0x000001DEEC4FDE80>, root_client=<openai.OpenAI object at 0x000001DEEC4FDF10>, root_async_client=<openai.AsyncOpenAI object at 0x000001DEEC4FDFD0>, model_name='gpt-4o-mini', temperature=0.3, model_kwargs={}, openai_api_key=SecretStr('**********'), top_p=0.95)

In [150]:
# chain 객체의 입력 스키마를 확인
chain.input_schema.model_json_schema() 

{'properties': {'question': {'title': 'Question', 'type': 'string'},
  'topic': {'title': 'Topic', 'type': 'string'}},
 'required': ['question', 'topic'],
 'title': 'PromptInput',
 'type': 'object'}

In [151]:
# chain 실행
response = chain.invoke( 
    {
        "topic": "화학(Chemistry)", 
        "question": "탄소의 원자 번호는 무엇인가요?"
    }
)

In [152]:
# 응답 객체를 출력
response

AIMessage(content='탄소의 원자 번호는 6입니다.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 11, 'prompt_tokens': 54, 'total_tokens': 65, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_560af6e559', 'id': 'chatcmpl-C9tCqxVruYw92FMmLNGDaxRRPrJSN', 'service_tier': 'default', 'finish_reason': 'stop', 'logprobs': None}, id='run--0121a642-531a-4aa0-9278-0afebdd20542-0', usage_metadata={'input_tokens': 54, 'output_tokens': 11, 'total_tokens': 65, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})

In [153]:
# 응답 객체의 텍스트를 출력
response.content

'탄소의 원자 번호는 6입니다.'

In [154]:
# 응답 객체의 메타데이터를 출력
response.response_metadata

{'token_usage': {'completion_tokens': 11,
  'prompt_tokens': 54,
  'total_tokens': 65,
  'completion_tokens_details': {'accepted_prediction_tokens': 0,
   'audio_tokens': 0,
   'reasoning_tokens': 0,
   'rejected_prediction_tokens': 0},
  'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}},
 'model_name': 'gpt-4o-mini-2024-07-18',
 'system_fingerprint': 'fp_560af6e559',
 'id': 'chatcmpl-C9tCqxVruYw92FMmLNGDaxRRPrJSN',
 'service_tier': 'default',
 'finish_reason': 'stop',
 'logprobs': None}

**Langfuse에서 체인 실행 추적**

- 체인의 각 단계(Prompt → LLM → OutputParser)가 추적됨
- 실행 시간과 토큰 사용량 분석 가능
- 디버깅과 최적화에 유용한 정보 제공

#### **2) Prompt + LLM + Output Parser**

* 데이터 파이프라인: `Prompt | LLM | OutputParser` 형태로 구성되어 LLM의 출력을 구조화된 형식으로 변환합니다.

* Parser 종류: JSON, XML 등 다양한 형식의 파서를 지원하여 LLM 출력을 원하는 데이터 구조로 변환할 수 있습니다.

* 유효성 검증: Parser가 출력 형식을 검증하여 잘못된 형식의 응답을 필터링하고 안정적인 데이터 처리를 보장합니다.

In [155]:
### 문자열 출력 파서 

from langchain_core.output_parsers import StrOutputParser

# 출력 파서를 생성
output_parser = StrOutputParser()

# 출력 파서를 실행
output_parser.invoke(response)

'탄소의 원자 번호는 6입니다.'

In [156]:
# 출력 파서를 chain에 추가
chain = prompt | llm | output_parser

# chain을 실행
response = chain.invoke(
    {
        "topic": "화학(Chemistry)", 
        "question": "탄소의 원자 번호는 무엇인가요?"
    }
)

# 응답 객체를 출력
response

'탄소의 원자 번호는 6입니다.'

In [157]:
# input_schema (chain)
chain.input_schema.model_json_schema()

{'properties': {'question': {'title': 'Question', 'type': 'string'},
  'topic': {'title': 'Topic', 'type': 'string'}},
 'required': ['question', 'topic'],
 'title': 'PromptInput',
 'type': 'object'}

In [158]:
# input_schema (prompt)
prompt.input_schema.model_json_schema()

{'properties': {'question': {'title': 'Question', 'type': 'string'},
  'topic': {'title': 'Topic', 'type': 'string'}},
 'required': ['question', 'topic'],
 'title': 'PromptInput',
 'type': 'object'}

**Langfuse에서 파이프라인 추적**

- Output Parser가 추가된 전체 파이프라인 추적
- 각 컴포넌트의 입력/출력 데이터 확인 가능
- 성능 병목 지점 식별 용이

---
# **Runnable**

* 실행 인터페이스: 모든 LangChain 컴포넌트는 Runnable 인터페이스를 구현하여 일관된 방식으로 실행됩니다.

* 실행 메서드: `.invoke()`, `.batch()`, `.stream()`, `.astream()` 등 다양한 실행 방식을 제공합니다.

* 호환성: 모든 Runnable 컴포넌트는 파이프(|) 연산자를 통해 연결 가능하며, 재사용이 용이합니다.

* Runnable의 주요 유형:

    * `RunnableSequence`: 여러 Runnable을 순차적으로 실행
    * `RunnablePassthrough`: 입력을 그대로 다음 단계로 전달    
    * `RunnableParallel`: 여러 Runnable을 병렬로 실행
    * `RunnableLambda`: 파이썬 함수를 Runnable로 래핑하여 체인에서 사용

#### 1) **RunnableSequence**

- **RunnableSequence**는 컴포넌트들을 연결하여 순차적으로 데이터를 처리하는 체인입니다

- `|` 연산자로 연결된 각 단계의 **출력이 다음 단계의 입력**으로 전달됩니다

- **다양한 실행 방식**(동기/비동기, 배치/스트리밍)을 지원합니다

- LLM 체인, 데이터 파이프라인, 자동화된 작업 등 **다단계 처리**에 활용됩니다

In [159]:
from langchain_core.runnables import RunnableSequence
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
import os

# 저장된 langfuse_handler 사용
langfuse_handler = globals().get('langfuse_handler')

# 컴포넌트 정의
prompt = PromptTemplate.from_template("'{text}'를 영어로 번역해주세요. 번역된 문장만을 출력해주세요.")

# 모델 정의 (init_langfuse.py 방식으로 callbacks 설정)
if langfuse_handler:
    model = ChatOpenAI(
        model="gpt-4o-mini", 
        temperature=0.3, 
        top_p=0.95,
        callbacks=[langfuse_handler]
    )
    print("✅ Langfuse 추적이 활성화된 번역 모델 생성")
else:
    model = ChatOpenAI(
        model="gpt-4o-mini", 
        temperature=0.3, 
        top_p=0.95
    )
    print("⚪ 추적 없는 번역 모델 생성")

output_parser = StrOutputParser()

# RunnableSequence 생성 - 함수 사용 
translation_chain = RunnableSequence(
    first=prompt,
    middle=[model],   # 리스트로 전달하는 점에 주의
    last=output_parser
)

print("🔗 번역 체인 생성 완료")

# RunnableSequence 생성 - 연산자 사용
# translation_chain = prompt | model | output_parser

✅ Langfuse 추적이 활성화된 번역 모델 생성
🔗 번역 체인 생성 완료


In [160]:
# 동기 실행
result = translation_chain.invoke({"text": "안녕하세요"})
print(result)  

Hello


**Langfuse에서 RunnableSequence 추적**

- 순차적 실행되는 각 단계가 명확히 추적됨
- 번역 작업의 전체 플로우 가시화
- 각 단계별 실행 시간 및 성능 분석 가능

#### 2) **RunnableParallel**

- **RunnableParallel**은 여러 컴포넌트를 딕셔너리 형태로 구성해 **동시 실행**합니다

- 동일한 입력이 모든 병렬 컴포넌트에 전달되며, 결과는 **키-값 쌍**으로 반환됩니다

- **데이터 변환**과 **파이프라인 구성**에 특화되어 있으며, 출력 형식을 다음 단계에 맞게 조정할 수 있습니다

`(1) 질문 분석 체인`

In [161]:
# 질문과 관련된 분야를 찾는 프롬프트
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 출력 파서 정의
output_parser = StrOutputParser()

# 질문 템플릿 정의
question_template = """
다음 카테고리 중 하나로 입력을 분류하세요. 다른 불필요한 텍스트는 출력하지 마세요:
- 화학(Chemistry)
- 물리(Physics)
- 생물(Biology)

# 예시:
Q: 사람의 염색체는 모두 몇개가 있나요?
A: 생물(Biology)

Q: {question}
A: """

# 프롬프트 생성
question_prompt = PromptTemplate.from_template(question_template)

# 체인 구성
question_chain = question_prompt | llm | output_parser

# 체인 실행
result = question_chain.invoke({"question": "탄소의 원자 번호는 무엇인가요?"})
print(f"분류 결과: {result}")

분류 결과: 화학(Chemistry)


`(2) 언어 감지 체인`

In [162]:
# 질문에 사용된 언어를 구분하는 프롬프트
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 출력 파서 정의
output_parser = StrOutputParser()

# 언어 분류 템플릿 정의
language_template = """
입력된 텍스트의 언어를 다음 카테고리 중 하나로 분류하세요. 다른 불필요한 텍스트는 출력하지 마세요:
- 영어(English)
- 한국어(Korean)
- 기타(Others)

# 예시:
Q: How many protons are in a carbon atom?
A: English

Q: 탄소의 원자 번호는 무엇인가요?
A: Korean

Q: ¿Cuál es el número atómico del carbono?
A: Others

입력: {question}
답변: """

# 프롬프트 생성
language_prompt = PromptTemplate.from_template(language_template)

# 체인 구성
language_chain = language_prompt | llm | output_parser

# 체인 실행 예시
examples = [
    "What is the atomic number of carbon?",
    "탄소의 원자 번호는 무엇인가요?",
    "¿Cuál es el número atómico del carbono?"
]

for example in examples:
    result = language_chain.invoke({"question": example})
    print(f"입력: {example}")
    print(f"분류 결과: {result}\n")

입력: What is the atomic number of carbon?
분류 결과: English

입력: 탄소의 원자 번호는 무엇인가요?
분류 결과: Korean

입력: 탄소의 원자 번호는 무엇인가요?
분류 결과: Korean

입력: ¿Cuál es el número atómico del carbono?
분류 결과: Others

입력: ¿Cuál es el número atómico del carbono?
분류 결과: Others



`(3) RunnableParallel을 사용한 병렬 실행 체인`

In [163]:
# 질문과 관련된 분야를 찾아서 질문에 대한 답변을 생성하는 프롬프트
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_core.output_parsers import StrOutputParser
from operator import itemgetter

# 답변 템플릿 정의
answer_template = """
당신은 {topic} 분야의 전문가입니다. {topic}에 관한 질문에 {language}로 답변해주세요.
질문: {question}
답변: """

# 프롬프트 및 체인 구성
answer_prompt = PromptTemplate.from_template(answer_template)
output_parser = StrOutputParser()

# 병렬 처리 체인 구성
answer_chain = RunnableParallel({
    "topic": question_chain,            # 주제 분류 체인
    "language": language_chain,         # 언어 감지 체인
    "question": itemgetter("question")  # 원본 질문 추출
}) | answer_prompt | llm | output_parser

# 체인 실행 예시
result = answer_chain.invoke({
    "question": "탄소의 원자 번호는 무엇인가요?"
})

print("처리 결과:")
print(f"답변: {result}")

처리 결과:
답변: 탄소의 원자 번호는 6입니다. 이는 탄소 원자가 6개의 양성자를 가지고 있음을 의미합니다.


**Langfuse에서 RunnableParallel 추적**

- 병렬 실행되는 체인들(주제 분류, 언어 감지)이 동시에 추적됨
- 각 병렬 작업의 실행 시간 비교 가능
- 복잡한 파이프라인의 성능 최적화에 유용

#### 3) **RunnablePassthrough**

- **RunnablePassthrough**는 입력값을 그대로 전달하여 원본 데이터를 보존합니다

- **RunnableParallel**과 함께 사용되어 입력 데이터를 새로운 키로 매핑할 수 있습니다

- **투명한 데이터 흐름**으로 파이프라인 디버깅과 구성이 용이합니다

In [164]:
from langchain_core.runnables import RunnablePassthrough
import re

runnable = RunnableParallel(
    passed=RunnablePassthrough(),
    modified=lambda x: int(re.search(r'\d+', x["query"]).group()),
)

runnable.invoke({"query": '탄소의 원자 번호는 6입니다.'})

{'passed': {'query': '탄소의 원자 번호는 6입니다.'}, 'modified': 6}

In [165]:
runnable = RunnableParallel(
    passed=RunnablePassthrough(),
    modified=lambda x: int(re.search(r'\d+', x).group()),
)

runnable.invoke('탄소의 원자 번호는 6입니다.')

{'passed': '탄소의 원자 번호는 6입니다.', 'modified': 6}

#### 4) **RunnableLambda**

- **RunnableLambda**는 일반 함수를 Runnable 객체로 변환하는 래퍼 컴포넌트입니다

- 체인에 **커스텀 로직**을 쉽게 통합할 수 있어 데이터 전처리, 후처리에 유용합니다

- `|` 연산자로 다른 컴포넌트들과 연결해 **복잡한 처리 흐름**을 구성할 수 있습니다

In [166]:
import re
from langchain_core.runnables import RunnableLambda, RunnablePassthrough

# 텍스트에서 숫자를 추출하는 함수
def extract_number(query):
    return int(re.search(r'\d+', query).group())

# RunnablePassthrough로 입력을 그대로 전달하고, RunnableLambda로 숫자 추출 함수 실행
runnable = RunnablePassthrough() | RunnableLambda(extract_number)

# 입력 텍스트에서 6을 추출
result = runnable.invoke('탄소의 원자 번호는 6입니다.')
print(result)  # 출력: 6

6


In [167]:
from langchain.schema.runnable import RunnableLambda
from langchain.prompts import ChatPromptTemplate

# 데이터 전처리 함수 정의
def preprocess_text(text: str) -> str:
    """ 입력 텍스트를 소문자로 변환하고 양쪽 공백을 제거합니다. """
    return text.strip().lower()

# 후처리 함수 정의
def postprocess_response(response: str) -> dict:
    """ 응답 텍스트를 대문자로 변환하고 길이를 계산합니다. """
    response_text = response.content
    return {
        "processed_response": response_text.upper(),
        "length": len(response_text)
    }

# 프롬프트 템플릿 생성
prompt = ChatPromptTemplate.from_template("다음 주제에 대해 영어 한 문장으로 설명해주세요: {topic}")

# 처리 파이프라인 구성
chain = (
    RunnableLambda(preprocess_text) |  # 입력 전처리
    prompt |                           # 프롬프트 포맷팅
    llm |                              # LLM 추론
    RunnableLambda(postprocess_response)  # 출력 후처리
)

# 체인 실행
result = chain.invoke("  Artificial Intelligence  ")
print(f"처리된 응답: {result['processed_response']}")
print(f"응답 길이: {result['length']}")

처리된 응답: ARTIFICIAL INTELLIGENCE REFERS TO THE SIMULATION OF HUMAN INTELLIGENCE IN MACHINES THAT ARE PROGRAMMED TO THINK, LEARN, AND PERFORM TASKS AUTONOMOUSLY.
응답 길이: 151


**Langfuse에서 RunnableLambda 추적**

- 커스텀 함수들(전처리, 후처리)도 추적됨
- 전체 데이터 변환 과정의 가시화
- 함수별 실행 시간 및 성능 분석 가능

---
# [실습]

- **다음과 같은 요구사항을 LCEL로 구현합니다**
   1. 사용자 입력을 받아 내용을 요약하기
   2. 요약된 내용을 기반으로 감정 분석하기 (긍정, 부정, 중립)
   3. 요약된 문장과 감정 분석 결과를 출력하기 

In [168]:
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel
from operator import itemgetter
import os

# 저장된 langfuse_handler 사용 (init_langfuse.py 방식)
langfuse_handler = globals().get('langfuse_handler')

if langfuse_handler:
    print("✅ Langfuse CallbackHandler 사용 (실습)")
else:
    print("❌ Langfuse CallbackHandler 없음 (추적 없이 진행)")

# 프롬프트 템플릿 정의
summarize_prompt = PromptTemplate.from_template(
    "다음 텍스트를 핵심 내용만 간단히 요약해주세요:\n\n{text}\n\n요약:"
)

sentiment_prompt = PromptTemplate.from_template(
    "다음 요약된 내용의 감정을 분석해주세요. '긍정', '부정', '중립' 중 하나로만 답변하세요:\n\n{summary}\n\n감정:"
)

# 문자열 출력 파서
output_parser = StrOutputParser()

# 체인 구성 (init_langfuse.py 방식으로 콜백 설정)
if langfuse_handler:
    model = ChatOpenAI(
        model="gpt-4o-mini", 
        temperature=0.3, 
        top_p=0.95,
        callbacks=[langfuse_handler]
    )
else:
    model = ChatOpenAI(
        model="gpt-4o-mini", 
        temperature=0.3, 
        top_p=0.95
    )

# 요약 체인
summarize_chain = summarize_prompt | model | output_parser

# 감정 분석 체인
sentiment_chain = sentiment_prompt | model | output_parser

# 전체 체인 - RunnableParallel 사용
chain = (
    {"text": itemgetter("text")} |  # 입력 텍스트 전달
    {"summary": summarize_chain} |  # 요약 실행
    RunnableParallel({
        "summary": itemgetter("summary"),  # 요약 결과 전달
        "sentiment": sentiment_chain      # 감정 분석 실행
    })
)

# 사용 예시 (인코딩 문제 해결을 위해 단순한 텍스트 사용)
text = """오늘 시험을 봤습니다. 준비를 열심히 했기 때문에 긴장했지만 문제를 잘 풀 수 있었습니다. 
결과적으로 만점을 받았고 매우 기뻤습니다. 
선생님께서도 칭찬해 주셔서 보람을 느꼈습니다. 
노력하면 좋은 결과가 따른다는 것을 다시 깨달았습니다."""

print("\n📝 실습 체인 실행 시작...")

try:
    # init_langfuse.py 방식으로 콜백 핸들러 사용
    if langfuse_handler:
        result = chain.invoke(
            {"text": text}, 
            config={"callbacks": [langfuse_handler]}
        )
    else:
        result = chain.invoke({"text": text})
        
    print("\n=== 처리 결과 ===")
    print(f"요약: {result['summary']}")
    print(f"감정: {result['sentiment']}")
    print("\n=== Langfuse에서 추적 결과를 확인하세요! ===")
    print(f"🔗 Langfuse 웹 인터페이스: {os.getenv('LANGFUSE_HOST')}")
    print("💡 잠시 후 웹 인터페이스에서 트레이스를 확인할 수 있습니다.")

    # 명시적으로 flush 시도 (init_langfuse.py 방식)
    try:
        langfuse_client = globals().get('langfuse_client')
        if langfuse_client:
            langfuse_client.flush()
            print("✅ Langfuse 데이터 전송 완료")
    except Exception as e:
        print(f"ℹ️  자동으로 데이터가 전송됩니다: {e}")

except Exception as e:
    print(f"❌ 체인 실행 실패: {e}")
    import traceback
    traceback.print_exc()

✅ Langfuse CallbackHandler 사용 (실습)

📝 실습 체인 실행 시작...

=== 처리 결과 ===
요약: 시험에서 만점을 받아 기쁘고, 노력의 중요성을 다시 깨달았다.
감정: 긍정

=== Langfuse에서 추적 결과를 확인하세요! ===
🔗 Langfuse 웹 인터페이스: http://reai.kro.kr:3000
💡 잠시 후 웹 인터페이스에서 트레이스를 확인할 수 있습니다.
✅ Langfuse 데이터 전송 완료

=== 처리 결과 ===
요약: 시험에서 만점을 받아 기쁘고, 노력의 중요성을 다시 깨달았다.
감정: 긍정

=== Langfuse에서 추적 결과를 확인하세요! ===
🔗 Langfuse 웹 인터페이스: http://reai.kro.kr:3000
💡 잠시 후 웹 인터페이스에서 트레이스를 확인할 수 있습니다.
✅ Langfuse 데이터 전송 완료
