# LangChain을 사용한 Auto-GPT 에이전트 구현

이 노트북에서는 LangChain 프레임워크를 사용하여 Auto-GPT 스타일의 자율 에이전트를 구현합니다.

## 주요 특징:
- 자율적 목표 설정 및 계획 수립
- 웹 검색 및 파일 관리 도구 활용
- 벡터 데이터베이스를 통한 장기 메모리
- ReAct (Reasoning + Acting) 패턴 적용

## 1. 필요한 라이브러리 설치 및 Import

In [None]:
# 필요한 패키지 설치
!pip install langchain-experimental
!pip install langchain-openai
!pip install langchain-community
!pip install faiss-cpu
!pip install google-search-results
!pip install tiktoken
!pip install python-dotenv

In [None]:
# 필요한 라이브러리 Import
import os
import faiss
from dotenv import load_dotenv

# LangChain Core
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate

# LangChain Experimental - AutoGPT
from langchain_experimental.autonomous_agents.autogpt.agent import AutoGPT
from langchain_experimental.autonomous_agents.autogpt.prompt import AutoGPTPrompt
from langchain_experimental.autonomous_agents.autogpt.prompt_generator import get_prompt

# LangChain Tools
from langchain.agents import Tool
from langchain.tools.file_management.write import WriteFileTool
from langchain.tools.file_management.read import ReadFileTool
from langchain_community.utilities import SerpAPIWrapper

# Vector Store and Memory
from langchain_community.vectorstores import FAISS
from langchain.docstore import InMemoryDocstore
from langchain_openai import OpenAIEmbeddings

# 기타
import json
import time
from typing import List, Dict, Any

load_dotenv()

## 2. API 키 설정

In [None]:
# API 키 설정
# .env 파일에 다음과 같이 저장하거나 직접 입력:
# OPENAI_API_KEY=your_openai_api_key
# SERPAPI_API_KEY=your_serpapi_key

# 환경 변수에서 API 키 가져오기
openai_api_key = os.getenv("OPENAI_API_KEY")
serpapi_key = os.getenv("SERPAPI_API_KEY")

# API 키가 없는 경우 직접 입력
if not openai_api_key:
    openai_api_key = input("OpenAI API 키를 입력하세요: ")
    os.environ["OPENAI_API_KEY"] = openai_api_key

if not serpapi_key:
    serpapi_key = input("SerpAPI 키를 입력하세요 (선택사항, 웹 검색용): ")
    if serpapi_key:
        os.environ["SERPAPI_API_KEY"] = serpapi_key

print("✅ API 키 설정 완료")


## 3. 도구(Tools) 설정

In [None]:
# 웹 검색 도구 설정
def setup_search_tool():
    """웹 검색 도구를 설정합니다."""
    if serpapi_key:
        search = SerpAPIWrapper(serpapi_api_key=serpapi_key)
        return Tool(
            name="search",
            func=search.run,
            description="최신 정보나 실시간 정보가 필요할 때 웹 검색을 수행합니다. 정확한 키워드로 검색하세요."
        )
    else:
        # SerpAPI 키가 없는 경우 더미 검색 함수
        def dummy_search(query: str) -> str:
            return f"검색 결과: {query}에 대한 정보를 찾을 수 없습니다. SerpAPI 키가 필요합니다."
        
        return Tool(
            name="search",
            func=dummy_search,
            description="웹 검색 도구 (SerpAPI 키 필요)"
        )

# 계산기 도구
def calculator(expression: str) -> str:
    """간단한 수학 계산을 수행합니다."""
    try:
        # 안전한 계산을 위해 eval 대신 제한된 연산만 허용
        allowed_chars = set('0123456789+-*/(). ')
        if all(c in allowed_chars for c in expression):
            result = eval(expression)
            return f"계산 결과: {expression} = {result}"
        else:
            return "잘못된 수식입니다. 숫자와 기본 연산자만 사용하세요."
    except Exception as e:
        return f"계산 오류: {str(e)}"

# 현재 시간 확인 도구
def get_current_time() -> str:
    """현재 시간을 반환합니다."""
    from datetime import datetime
    return f"현재 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"

# 도구들 설정
search_tool = setup_search_tool()
write_file_tool = WriteFileTool()
read_file_tool = ReadFileTool()

calculator_tool = Tool(
    name="calculator",
    func=calculator,
    description="수학 계산을 수행합니다. 예: '2+2' 또는 '10*5-3'"
)

time_tool = Tool(
    name="current_time",
    func=get_current_time,
    description="현재 날짜와 시간을 확인합니다."
)

# 모든 도구를 리스트로 정리
tools = [
    search_tool,
    write_file_tool,
    read_file_tool,
    calculator_tool,
    time_tool
]

print(f"✅ 총 {len(tools)}개의 도구가 설정되었습니다:")
for tool in tools:
    print(f"  - {tool.name}: {tool.description}")

## 4. 벡터 메모리 시스템 설정

In [None]:
# 임베딩 모델과 벡터 스토어 설정
def setup_memory_system():
    """FAISS를 사용한 벡터 메모리 시스템을 설정합니다."""
    
    # OpenAI 임베딩 모델 초기화
    embeddings_model = OpenAIEmbeddings(
        openai_api_key=openai_api_key,
        model="text-embedding-3-small"
    )
    
    # 임베딩 차원 (OpenAI text-embedding-3-small의 경우 1536)
    embedding_size = 1536
    
    # FAISS 인덱스 초기화 (L2 거리 사용)
    index = faiss.IndexFlatL2(embedding_size)
    
    # 벡터 스토어 생성
    vectorstore = FAISS(
        embedding_function=embeddings_model.embed_query,
        index=index,
        docstore=InMemoryDocstore({}),
        index_to_docstore_id={}
    )
    
    return vectorstore

# 메모리 시스템 설정
vectorstore = setup_memory_system()
print("✅ 벡터 메모리 시스템이 설정되었습니다.")


## 5. LLM 모델 설정

In [None]:
# ChatOpenAI 모델 설정
def setup_llm(model_name="gpt-4o-mini", temperature=0.1):
    """OpenAI 언어 모델을 설정합니다."""
    
    llm = ChatOpenAI(
        openai_api_key=openai_api_key,
        model_name=model_name,
        temperature=temperature,
        max_tokens=2000
    )
    
    return llm

# LLM 모델 초기화
llm = setup_llm()
print(f"✅ {llm.model_name} 모델이 설정되었습니다.")


## 6. Auto-GPT 에이전트 생성

In [None]:
# Auto-GPT 에이전트 클래스 정의
class AutoGPTAgent:
    def __init__(self, ai_name: str, ai_role: str, llm, tools, vectorstore):
        self.ai_name = ai_name
        self.ai_role = ai_role
        self.llm = llm
        self.tools = tools
        self.vectorstore = vectorstore
        
        # AutoGPT 에이전트 생성
        self.agent = AutoGPT.from_llm_and_tools(
            ai_name=self.ai_name,
            ai_role=self.ai_role,
            tools=self.tools,
            llm=self.llm,
            memory=self.vectorstore.as_retriever(search_kwargs={"k": 5})
        )
    
    def run(self, goals: List[str], max_iterations: int = 10) -> str:
        """주어진 목표들을 실행합니다."""
        print(f"🤖 {self.ai_name} 에이전트가 작업을 시작합니다...")
        print(f"📋 목표: {goals}")
        print(f"🔄 최대 반복 횟수: {max_iterations}")
        print("\n" + "="*50)
        
        try:
            result = self.agent.run(goals)
            return result
        except Exception as e:
            error_msg = f"에이전트 실행 중 오류 발생: {str(e)}"
            print(f"❌ {error_msg}")
            return error_msg
    
    def add_memory(self, text: str):
        """메모리에 텍스트를 추가합니다."""
        from langchain.docstore.document import Document
        doc = Document(page_content=text)
        self.vectorstore.add_documents([doc])
        print(f"💾 메모리에 정보가 저장되었습니다: {text[:100]}...")

## 7. 에이전트 인스턴스 생성 및 설정

In [None]:
# Auto-GPT 에이전트 생성
def create_autogpt_agent(ai_name: str = "ResearchGPT", 
                        ai_role: str = "연구 및 분석 전문 AI 어시스턴트"):
    """Auto-GPT 에이전트를 생성합니다."""
    
    agent = AutoGPTAgent(
        ai_name=ai_name,
        ai_role=ai_role,
        llm=llm,
        tools=tools,
        vectorstore=vectorstore
    )
    
    return agent

# 에이전트 인스턴스 생성
autogpt_agent = create_autogpt_agent()

print(f"✅ {autogpt_agent.ai_name} 에이전트가 생성되었습니다!")
print(f"🎯 역할: {autogpt_agent.ai_role}")


## 8. 사용 예제 1: 간단한 연구 작업

In [None]:
# 예제 1: 간단한 연구 및 파일 생성 작업
def example_1_research_task():
    """간단한 연구 작업 예제"""
    
    goals = [
        "Python의 주요 특징에 대해 조사하고 정리하기",
        "조사한 내용을 'python_features.txt' 파일로 저장하기",
        "파일이 제대로 저장되었는지 확인하기"
    ]
    
    print("📝 예제 1: Python 특징 연구 작업")
    result = autogpt_agent.run(goals, max_iterations=15)
    
    print("\n🏁 작업 완료!")
    print(f"결과: {result}")
    
    return result

# 예제 실행
result_1 = example_1_research_task()

## 9. 사용 예제 2: 계산과 분석 작업

In [None]:
# 예제 2: 계산과 분석 작업
def example_2_calculation_task():
    """계산과 분석 작업 예제"""
    
    goals = [
        "현재 시간을 확인하기",
        "1부터 100까지의 합을 계산하기",
        "계산 결과와 현재 시간을 포함한 보고서를 'calculation_report.txt' 파일로 작성하기"
    ]
    
    print("🔢 예제 2: 계산 및 보고서 작성 작업")
    result = autogpt_agent.run(goals, max_iterations=10)
    
    print("\n🏁 작업 완료!")
    print(f"결과: {result}")
    
    return result

# 예제 실행
result_2 = example_2_calculation_task()

## 10. 사용 예제 3: 복합 작업 (검색 + 분석 + 저장)

In [None]:
# 예제 3: 복합 작업 - 검색, 분석, 저장
def example_3_complex_task():
    """복합 작업 예제: 정보 검색, 분석, 파일 저장"""
    
    goals = [
        "인공지능의 최신 동향에 대해 조사하기",
        "조사한 정보를 요약하고 분석하기",
        "분석 결과를 'ai_trends_analysis.txt' 파일로 저장하기",
        "저장된 파일의 내용을 확인하여 올바르게 저장되었는지 검증하기"
    ]
    
    print("🔍 예제 3: AI 동향 조사 및 분석 작업")
    result = autogpt_agent.run(goals, max_iterations=20)
    
    print("\n🏁 작업 완료!")
    print(f"결과: {result}")
    
    return result

# 예제 실행
result_3 = example_3_complex_task()

## 11. 사용자 정의 작업 실행

In [None]:
# 사용자가 직접 목표를 설정하여 실행하는 함수
def run_custom_task():
    """사용자 정의 작업을 실행합니다."""
    
    print("🎯 사용자 정의 작업을 설정하세요.")
    print("여러 개의 목표를 입력할 수 있습니다. 각 목표를 한 줄씩 입력하고, 완료하면 빈 줄을 입력하세요.\n")
    
    goals = []
    goal_num = 1
    
    while True:
        goal = input(f"목표 {goal_num}: ").strip()
        if not goal:
            break
        goals.append(goal)
        goal_num += 1
    
    if not goals:
        print("❌ 목표가 입력되지 않았습니다.")
        return
    
    # 최대 반복 횟수 설정
    max_iterations = int(input("최대 반복 횟수를 입력하세요 (기본값: 15): ") or 15)
    
    print(f"\n🚀 작업을 시작합니다...")
    result = autogpt_agent.run(goals, max_iterations=max_iterations)
    
    print("\n🏁 사용자 정의 작업 완료!")
    print(f"결과: {result}")
    
    return result

# 사용자 정의 작업 실행 (주석을 해제하여 실행)
# custom_result = run_custom_task()

## 12. 고급 기능: 메모리 관리

In [None]:
# 메모리 관리 기능
def manage_memory():
    """에이전트의 메모리를 관리합니다."""
    
    print("🧠 메모리 관리 시스템")
    print("1. 메모리에 정보 추가")
    print("2. 메모리 검색")
    print("3. 메모리 상태 확인")
    
    choice = input("선택하세요 (1-3): ").strip()
    
    if choice == "1":
        info = input("메모리에 추가할 정보를 입력하세요: ")
        autogpt_agent.add_memory(info)
        
    elif choice == "2":
        query = input("검색할 내용을 입력하세요: ")
        retriever = autogpt_agent.vectorstore.as_retriever(search_kwargs={"k": 3})
        docs = retriever.get_relevant_documents(query)
        
        print(f"\n🔍 '{query}'에 대한 검색 결과:")
        for i, doc in enumerate(docs, 1):
            print(f"{i}. {doc.page_content[:200]}...")
            
    elif choice == "3":
        # 메모리 상태 확인
        total_docs = len(autogpt_agent.vectorstore.docstore._dict)
        print(f"💾 현재 메모리에 저장된 문서 수: {total_docs}")
        
    else:
        print("❌ 잘못된 선택입니다.")

# 메모리 관리 실행 (주석을 해제하여 실행)
# manage_memory()

## 13. 성능 모니터링 및 디버깅

In [None]:
# 성능 모니터링 기능
import time
from datetime import datetime

class PerformanceMonitor:
    def __init__(self):
        self.execution_logs = []
        self.start_time = None
        self.end_time = None
    
    def start_monitoring(self, task_name: str):
        self.start_time = time.time()
        print(f"⏱️  '{task_name}' 모니터링 시작: {datetime.now().strftime('%H:%M:%S')}")
    
    def end_monitoring(self, task_name: str):
        self.end_time = time.time()
        duration = self.end_time - self.start_time
        
        log_entry = {
            "task": task_name,
            "start_time": self.start_time,
            "end_time": self.end_time,
            "duration": duration,
            "timestamp": datetime.now().isoformat()
        }
        
        self.execution_logs.append(log_entry)
        print(f"⏱️  '{task_name}' 완료: {duration:.2f}초 소요")
        
    def get_performance_report(self):
        if not self.execution_logs:
            return "수행된 작업이 없습니다."
        
        report = "\n📊 성능 보고서\n" + "="*30 + "\n"
        total_time = sum(log["duration"] for log in self.execution_logs)
        avg_time = total_time / len(self.execution_logs)
        
        report += f"총 작업 수: {len(self.execution_logs)}\n"
        report += f"총 실행 시간: {total_time:.2f}초\n"
        report += f"평균 실행 시간: {avg_time:.2f}초\n\n"
        
        report += "작업별 실행 시간:\n"
        for i, log in enumerate(self.execution_logs, 1):
            report += f"{i}. {log['task']}: {log['duration']:.2f}초\n"
        
        return report

# 성능 모니터 인스턴스 생성
performance_monitor = PerformanceMonitor()

# 모니터링이 포함된 작업 실행 함수
def run_monitored_task(goals: List[str], task_name: str = "Custom Task"):
    """성능 모니터링과 함께 작업을 실행합니다."""
    
    performance_monitor.start_monitoring(task_name)
    
    try:
        result = autogpt_agent.run(goals, max_iterations=15)
        performance_monitor.end_monitoring(task_name)
        return result
    except Exception as e:
        performance_monitor.end_monitoring(task_name)
        print(f"❌ 작업 실행 중 오류: {str(e)}")
        return None

# 성능 보고서 출력
def show_performance_report():
    print(performance_monitor.get_performance_report())

print("✅ 성능 모니터링 시스템이 준비되었습니다.")


## 14. 전체 시스템 테스트

In [None]:
# 전체 시스템 종합 테스트
def comprehensive_test():
    """Auto-GPT 시스템의 종합적인 테스트를 수행합니다."""
    
    print("🧪 Auto-GPT 시스템 종합 테스트 시작")
    print("=" * 50)
    
    # 테스트 케이스들
    test_cases = [
        {
            "name": "기본 파일 조작",
            "goals": [
                "현재 시간을 확인하기",
                "시간 정보를 'current_time.txt' 파일에 저장하기",
                "저장된 파일을 읽어서 내용 확인하기"
            ]
        },
        {
            "name": "계산 및 분석",
            "goals": [
                "10 + 20 * 3을 계산하기",
                "계산 결과를 분석하고 설명하기",
                "결과를 'calculation_analysis.txt' 파일로 저장하기"
            ]
        }
    ]
    
    results = {}
    
    for test_case in test_cases:
        print(f"\n🔧 테스트: {test_case['name']}")
        print("-" * 30)
        
        try:
            result = run_monitored_task(test_case["goals"], test_case["name"])
            results[test_case["name"]] = {"success": True, "result": result}
            print(f"✅ {test_case['name']} 테스트 성공")
        except Exception as e:
            results[test_case["name"]] = {"success": False, "error": str(e)}
            print(f"❌ {test_case['name']} 테스트 실패: {str(e)}")
    
    # 테스트 결과 요약
    print("\n📋 테스트 결과 요약")
    print("=" * 50)
    
    success_count = sum(1 for r in results.values() if r["success"])
    total_count = len(results)
    
    print(f"총 테스트 수: {total_count}")
    print(f"성공한 테스트: {success_count}")
    print(f"실패한 테스트: {total_count - success_count}")
    print(f"성공률: {(success_count/total_count)*100:.1f}%")
    
    # 성능 보고서 출력
    show_performance_report()
    
    return results

# 종합 테스트 실행
test_results = comprehensive_test()

## 15. 시스템 정보 및 도움말

In [None]:
# 시스템 정보 및 도움말 함수
def show_system_info():
    """시스템 정보를 출력합니다."""
    
    print("🤖 Auto-GPT 에이전트 시스템 정보")
    print("=" * 50)
    print(f"에이전트 이름: {autogpt_agent.ai_name}")
    print(f"에이전트 역할: {autogpt_agent.ai_role}")
    print(f"사용 중인 LLM: {autogpt_agent.llm.model_name}")
    print(f"등록된 도구 수: {len(autogpt_agent.tools)}")
    
    print("\n🛠️  사용 가능한 도구들:")
    for i, tool in enumerate(autogpt_agent.tools, 1):
        print(f"{i}. {tool.name}: {tool.description}")
    
    print("\n💾 메모리 시스템:")
    total_docs = len(autogpt_agent.vectorstore.docstore._dict)
    print(f"저장된 문서 수: {total_docs}")
    print("벡터 데이터베이스: FAISS")
    print("임베딩 모델: OpenAI text-embedding-3-small")

def show_usage_guide():
    """사용 가이드를 출력합니다."""
    
    print("📖 Auto-GPT 에이전트 사용 가이드")
    print("=" * 50)
    
    print("1. 기본 사용법:")
    print("   - 목표 리스트를 작성합니다")
    print("   - autogpt_agent.run(goals)를 호출합니다")
    
    print("2. 목표 작성 팁:")
    print("   - 구체적이고 명확한 목표를 설정하세요")
    print("   - 단계별로 나누어 작성하세요")
    print("   - 검증 가능한 결과를 포함하세요")
    
    print("3. 예제 목표들:")
    print("   - '특정 주제에 대해 조사하고 보고서 작성하기'")
    print("   - '계산을 수행하고 결과를 파일로 저장하기'")
    print("   - '웹에서 정보를 검색하고 요약하기'")
    
    print("4. 주의사항:")
    print("   - API 키가 올바르게 설정되었는지 확인하세요")
    print("   - 복잡한 작업의 경우 충분한 반복 횟수를 설정하세요")
    print("   - 파일 작업 시 권한을 확인하세요")

# 시스템 정보 및 가이드 출력
show_system_info()
print("\n")
show_usage_guide()

## 16. 마무리 및 정리

In [None]:
# 정리 함수
def cleanup_and_summary():
    """생성된 파일들을 정리하고 요약 정보를 출력합니다."""
    
    import os
    import glob
    
    print("🧹 시스템 정리 및 요약")
    print("=" * 50)
    
    # 생성된 파일들 확인
    txt_files = glob.glob("*.txt")
    if txt_files:
        print(f"📁 생성된 파일들 ({len(txt_files)}개):")
        for file in txt_files:
            size = os.path.getsize(file)
            print(f"  - {file} ({size} bytes)")
    else:
        print("📁 생성된 텍스트 파일이 없습니다.")
    
    # 메모리 상태
    total_docs = len(autogpt_agent.vectorstore.docstore._dict)
    print(f"\n💾 메모리 상태: {total_docs}개의 문서가 저장됨")
    
    # 성능 요약
    if performance_monitor.execution_logs:
        print(f"\n⏱️  총 {len(performance_monitor.execution_logs)}개의 작업이 실행됨")
        total_time = sum(log["duration"] for log in performance_monitor.execution_logs)
        print(f"총 실행 시간: {total_time:.2f}초")
    
    print("\n✅ Auto-GPT 에이전트 시스템이 성공적으로 구현되고 테스트되었습니다!")
    print("\n💡 추가 개발 아이디어:")
    print("  - 더 다양한 도구 추가 (데이터베이스 연결, API 호출 등)")
    print("  - GUI 인터페이스 개발")
    print("  - 작업 스케줄링 기능 추가")
    print("  - 다중 에이전트 협업 시스템 구축")

# 최종 정리
cleanup_and_summary()