# 3차시: RAG 실무 구현

## 학습 목표
- 문서 처리 파이프라인 구축
- 벡터 임베딩 및 유사도 검색 구현
- RAG 기반 질의응답 시스템 구축
- 실제 PDF 문서를 활용한 챗봇 테스트

## 목차
1. [환경 설정](#환경-설정)
2. [문서 처리 파이프라인](#문서-처리-파이프라인)
3. [벡터 임베딩 시스템](#벡터-임베딩-시스템)
4. [검색 시스템](#검색-시스템)
5. [RAG 응답 생성](#rag-응답-생성)
6. [통합 테스트](#통합-테스트)

## 환경 설정

필요한 라이브러리를 설치하고 기본 설정을 진행합니다.

In [None]:
# 필요한 라이브러리 설치 (주석 해제하여 실행)
# !pip install pypdf2 python-docx faiss-cpu sentence-transformers chromadb

print("라이브러리 설치 완료!")

In [None]:
#!/usr/bin/env python3
"""
AI 챗봇 멘토링 - 3차시: RAG 시스템 구현
Author: AI Chatbot Workshop
Date: 2024-08-30
Description: 문서 기반 검색 증강 생성(RAG) 챗봇 구현
"""

import os
import sys
import logging
import json
import time
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass, asdict
from pathlib import Path
import asyncio
from datetime import datetime
import hashlib
import uuid

# 외부 라이브러리
import streamlit as st
import numpy as np
import pandas as pd
from openai import OpenAI

# 문서 처리
import PyPDF2
import docx
from sentence_transformers import SentenceTransformer

# 벡터 데이터베이스
import faiss
import chromadb
from chromadb.config import Settings

# 로컬 모듈
sys.path.append('..')
from config import get_config

# 설정 로드
config = get_config()
logger = logging.getLogger(__name__)

print("모든 라이브러리 임포트 완료!")

## 1. 문서 처리 파이프라인

### 1.1 문서 파서 구현
다양한 형식의 문서를 처리할 수 있는 파서를 구현합니다.

In [None]:
@dataclass
class Document:
    """문서 데이터 구조"""
    id: str
    content: str
    metadata: Dict[str, Any]
    source: str
    created_at: datetime

class DocumentProcessor:
    """문서 처리 파이프라인"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.supported_formats = ['.pdf', '.txt', '.docx', '.md']
    
    def process_file(self, file_path: str) -> Document:
        """
        파일을 처리하여 Document 객체로 변환
        
        Args:
            file_path (str): 처리할 파일 경로
            
        Returns:
            Document: 처리된 문서 객체
        """
        file_path = Path(file_path)
        
        if not file_path.exists():
            raise FileNotFoundError(f"파일을 찾을 수 없습니다: {file_path}")
        
        extension = file_path.suffix.lower()
        
        if extension not in self.supported_formats:
            raise ValueError(f"지원되지 않는 파일 형식: {extension}")
        
        self.logger.info(f"문서 처리 시작: {file_path}")
        start_time = time.time()
        
        try:
            if extension == '.pdf':
                content = self._extract_pdf(file_path)
            elif extension == '.docx':
                content = self._extract_docx(file_path)
            else:  # .txt, .md
                content = self._extract_text(file_path)
            
            # 문서 메타데이터 생성
            metadata = {
                'filename': file_path.name,
                'file_size': file_path.stat().st_size,
                'file_type': extension,
                'word_count': len(content.split()),
                'char_count': len(content),
                'processing_time': time.time() - start_time
            }
            
            document = Document(
                id=str(uuid.uuid4()),
                content=content,
                metadata=metadata,
                source=str(file_path),
                created_at=datetime.now()
            )
            
            self.logger.info(f"문서 처리 완료: {metadata['word_count']}개 단어")
            return document
            
        except Exception as e:
            self.logger.error(f"문서 처리 실패: {e}")
            raise
    
    def _extract_pdf(self, file_path: Path) -> str:
        """PDF 파일에서 텍스트 추출"""
        text = ""
        
        with open(file_path, 'rb') as file:
            reader = PyPDF2.PdfReader(file)
            
            for page_num, page in enumerate(reader.pages):
                try:
                    page_text = page.extract_text()
                    text += f"\n--- 페이지 {page_num + 1} ---\n{page_text}"
                except Exception as e:
                    self.logger.warning(f"페이지 {page_num + 1} 처리 실패: {e}")
        
        return text.strip()
    
    def _extract_docx(self, file_path: Path) -> str:
        """DOCX 파일에서 텍스트 추출"""
        doc = docx.Document(file_path)
        text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
        return text.strip()
    
    def _extract_text(self, file_path: Path) -> str:
        """텍스트 파일에서 내용 추출"""
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read().strip()

# 문서 처리 테스트
processor = DocumentProcessor()

# 샘플 텍스트 파일 생성 (테스트용)
sample_text = """
인공지능과 머신러닝의 발전

인공지능(AI)은 컴퓨터가 인간과 같은 지능적인 행동을 하도록 만드는 기술입니다.
머신러닝은 인공지능의 한 분야로, 데이터를 통해 패턴을 학습하여 예측이나 결정을 내리는 방법입니다.

최근 딥러닝의 발전으로 자연어 처리, 이미지 인식, 음성 인식 등 다양한 분야에서 괄목할 만한 성과를 보이고 있습니다.
특히 ChatGPT와 같은 대화형 AI는 일반 사용자들도 쉽게 활용할 수 있게 되었습니다.

RAG(Retrieval-Augmented Generation)는 검색 기반 생성 모델로, 외부 지식 베이스에서 관련 정보를 검색하여
더욱 정확하고 최신의 정보를 바탕으로 답변을 생성하는 기술입니다.
"""

# 샘플 파일 생성
sample_file = Path("sample_document.txt")
with open(sample_file, 'w', encoding='utf-8') as f:
    f.write(sample_text)

# 문서 처리 테스트
document = processor.process_file(sample_file)

print(f"문서 ID: {document.id}")
print(f"문서 길이: {len(document.content)} 문자")
print(f"메타데이터: {document.metadata}")
print(f"\n문서 내용 (처음 200자):")
print(document.content[:200] + "...")

# 임시 파일 삭제
sample_file.unlink()

### 1.2 스마트 청킹 구현
문서를 의미 있는 단위로 분할하는 청킹 알고리즘을 구현합니다.

In [None]:
@dataclass
class DocumentChunk:
    """문서 청크 데이터 구조"""
    id: str
    content: str
    doc_id: str
    chunk_index: int
    metadata: Dict[str, Any]

class SmartChunker:
    """스마트 문서 청킹 시스템"""
    
    def __init__(self, chunk_size: int = 500, overlap: int = 50):
        """
        청킹 시스템 초기화
        
        Args:
            chunk_size (int): 청크 크기 (문자 수)
            overlap (int): 청크 간 중복 크기
        """
        self.chunk_size = chunk_size
        self.overlap = overlap
        self.logger = logging.getLogger(__name__)
    
    def chunk_document(self, document: Document) -> List[DocumentChunk]:
        """
        문서를 청크로 분할
        
        Args:
            document (Document): 분할할 문서
            
        Returns:
            List[DocumentChunk]: 분할된 청크 목록
        """
        self.logger.info(f"문서 청킹 시작: {document.id}")
        start_time = time.time()
        
        # 문단 기반 1차 분할
        paragraphs = self._split_by_paragraphs(document.content)
        
        # 크기 기반 2차 분할
        chunks = self._split_by_size(paragraphs)
        
        # 청크 객체 생성
        chunk_objects = []
        for i, chunk_content in enumerate(chunks):
            chunk = DocumentChunk(
                id=f"{document.id}_chunk_{i}",
                content=chunk_content.strip(),
                doc_id=document.id,
                chunk_index=i,
                metadata={
                    'char_count': len(chunk_content),
                    'word_count': len(chunk_content.split()),
                    'source_file': document.metadata.get('filename', ''),
                    'chunk_method': 'smart_chunking'
                }
            )
            chunk_objects.append(chunk)
        
        processing_time = time.time() - start_time
        self.logger.info(
            f"청킹 완료: {len(chunk_objects)}개 청크 생성, "
            f"처리 시간: {processing_time:.2f}초"
        )
        
        return chunk_objects
    
    def _split_by_paragraphs(self, text: str) -> List[str]:
        """문단 기준으로 텍스트 분할"""
        # 개행 문자 기준으로 분할
        paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
        
        # 너무 짧은 문단들은 합치기
        merged_paragraphs = []
        current_para = ""
        
        for para in paragraphs:
            if len(current_para + para) <= self.chunk_size:
                current_para += "\n\n" + para if current_para else para
            else:
                if current_para:
                    merged_paragraphs.append(current_para)
                current_para = para
        
        if current_para:
            merged_paragraphs.append(current_para)
        
        return merged_paragraphs
    
    def _split_by_size(self, paragraphs: List[str]) -> List[str]:
        """크기 기준으로 텍스트 분할 (슬라이딩 윈도우 적용)"""
        chunks = []
        
        for para in paragraphs:
            if len(para) <= self.chunk_size:
                chunks.append(para)
            else:
                # 큰 문단을 작은 청크로 분할
                for i in range(0, len(para), self.chunk_size - self.overlap):
                    chunk = para[i:i + self.chunk_size]
                    if chunk.strip():
                        chunks.append(chunk)
        
        return chunks

# 청킹 시스템 테스트
chunker = SmartChunker(chunk_size=200, overlap=50)

# 더 긴 샘플 문서 생성
long_sample = """
인공지능의 역사와 발전

인공지능의 개념은 1950년대 앨런 튜링의 "튜링 테스트"로부터 시작되었습니다. 
이후 1956년 다트머스 컨퍼런스에서 "인공지능"이라는 용어가 공식적으로 사용되기 시작했습니다.

1980년대에는 전문가 시스템이 인기를 얻었지만, 지식을 수동으로 입력해야 하는 한계가 있었습니다.
1990년대부터 머신러닝 접근법이 주목받기 시작했고, 데이터로부터 자동으로 학습하는 방법이 발전했습니다.

딥러닝의 혁신

2010년대 들어 딥러닝이 컴퓨터 비전과 자연어 처리 분야에서 혁신적인 성과를 보였습니다.
합성곱 신경망(CNN)은 이미지 인식에서, 순환 신경망(RNN)과 LSTM은 시계열 데이터 처리에서 뛰어난 성능을 보였습니다.

2017년 구글이 발표한 트랜스포머 아키텍처는 자연어 처리 분야를 완전히 바꾸어 놓았습니다.
어텐션 메커니즘을 통해 문맥을 더 잘 이해할 수 있게 되었고, 이는 BERT, GPT 같은 모델의 기반이 되었습니다.

현재의 생성형 AI

ChatGPT의 등장으로 생성형 AI가 일반 대중에게 널리 알려지게 되었습니다.
대화형 인터페이스를 통해 누구나 쉽게 AI와 상호작용할 수 있게 되었습니다.
RAG(Retrieval-Augmented Generation) 기술은 외부 지식과 결합하여 더 정확한 정보를 제공할 수 있게 해줍니다.
"""

# 긴 문서로 테스트
long_doc = Document(
    id="test_doc_001",
    content=long_sample,
    metadata={'filename': 'ai_history.txt'},
    source="test",
    created_at=datetime.now()
)

# 청킹 실행
chunks = chunker.chunk_document(long_doc)

print(f"\n총 {len(chunks)}개의 청크가 생성되었습니다:")
print("-" * 50)

for i, chunk in enumerate(chunks):
    print(f"\n청크 {i+1} (ID: {chunk.id}):")
    print(f"길이: {chunk.metadata['char_count']}자, {chunk.metadata['word_count']}단어")
    print(f"내용: {chunk.content[:100]}...")
    print("-" * 30)

## 2. 벡터 임베딩 시스템

### 2.1 임베딩 생성기 구현
텍스트를 벡터로 변환하는 임베딩 시스템을 구현합니다.

In [None]:
class EmbeddingGenerator:
    """텍스트 임베딩 생성 시스템"""
    
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        """
        임베딩 생성기 초기화
        
        Args:
            model_name (str): 사용할 sentence transformer 모델명
        """
        self.model_name = model_name
        self.logger = logging.getLogger(__name__)
        
        try:
            self.logger.info(f"임베딩 모델 로딩: {model_name}")
            self.model = SentenceTransformer(model_name)
            self.embedding_dim = self.model.get_sentence_embedding_dimension()
            self.logger.info(f"모델 로딩 완료. 차원: {self.embedding_dim}")
        except Exception as e:
            self.logger.error(f"모델 로딩 실패: {e}")
            raise
    
    def generate_embedding(self, text: str) -> np.ndarray:
        """
        단일 텍스트에 대한 임베딩 생성
        
        Args:
            text (str): 임베딩할 텍스트
            
        Returns:
            np.ndarray: 생성된 임베딩 벡터
        """
        try:
            start_time = time.time()
            embedding = self.model.encode(text, convert_to_numpy=True)
            
            self.logger.debug(
                f"임베딩 생성 완료: {len(text)}자 -> {embedding.shape}, "
                f"처리 시간: {time.time() - start_time:.3f}초"
            )
            
            return embedding
            
        except Exception as e:
            self.logger.error(f"임베딩 생성 실패: {e}")
            raise
    
    def generate_batch_embeddings(self, texts: List[str]) -> np.ndarray:
        """
        여러 텍스트에 대한 배치 임베딩 생성
        
        Args:
            texts (List[str]): 임베딩할 텍스트 목록
            
        Returns:
            np.ndarray: 생성된 임베딩 행렬
        """
        try:
            self.logger.info(f"{len(texts)}개 텍스트 배치 임베딩 생성 시작")
            start_time = time.time()
            
            embeddings = self.model.encode(
                texts, 
                convert_to_numpy=True,
                show_progress_bar=True
            )
            
            processing_time = time.time() - start_time
            self.logger.info(
                f"배치 임베딩 생성 완료: {embeddings.shape}, "
                f"처리 시간: {processing_time:.2f}초 "
                f"(평균 {processing_time/len(texts):.3f}초/텍스트)"
            )
            
            return embeddings
            
        except Exception as e:
            self.logger.error(f"배치 임베딩 생성 실패: {e}")
            raise
    
    def compute_similarity(self, embedding1: np.ndarray, 
                         embedding2: np.ndarray) -> float:
        """
        두 임베딩 간 코사인 유사도 계산
        
        Args:
            embedding1 (np.ndarray): 첫 번째 임베딩
            embedding2 (np.ndarray): 두 번째 임베딩
            
        Returns:
            float: 코사인 유사도 (-1 ~ 1)
        """
        # 코사인 유사도 = dot(A, B) / (norm(A) * norm(B))
        dot_product = np.dot(embedding1, embedding2)
        norm_a = np.linalg.norm(embedding1)
        norm_b = np.linalg.norm(embedding2)
        
        if norm_a == 0 or norm_b == 0:
            return 0.0
        
        return dot_product / (norm_a * norm_b)

# 임베딩 생성기 테스트
print("임베딩 모델 로딩 중... (처음 실행시 다운로드로 시간이 걸릴 수 있습니다)")
embedding_generator = EmbeddingGenerator()

# 테스트 텍스트들
test_texts = [
    "인공지능은 컴퓨터가 인간과 같은 지능을 가지도록 하는 기술입니다.",
    "머신러닝은 데이터로부터 패턴을 학습하는 인공지능의 한 분야입니다.",
    "오늘 날씨가 정말 좋네요. 산책하기에 완벽한 날입니다.",
    "RAG는 검색과 생성을 결합한 혁신적인 AI 기술입니다."
]

# 단일 임베딩 테스트
single_embedding = embedding_generator.generate_embedding(test_texts[0])
print(f"\n단일 임베딩 결과:")
print(f"차원: {single_embedding.shape}")
print(f"첫 5개 값: {single_embedding[:5]}")

# 배치 임베딩 테스트
batch_embeddings = embedding_generator.generate_batch_embeddings(test_texts)
print(f"\n배치 임베딩 결과:")
print(f"형태: {batch_embeddings.shape}")

# 유사도 계산 테스트
print(f"\n유사도 매트릭스:")
print(f"{'':30} ", end="")
for i in range(len(test_texts)):
    print(f"텍스트{i+1:2}", end="  ")
print()

for i, text_i in enumerate(test_texts):
    print(f"텍스트{i+1} ({text_i[:20]}...) ", end="")
    for j in range(len(test_texts)):
        similarity = embedding_generator.compute_similarity(
            batch_embeddings[i], batch_embeddings[j]
        )
        print(f"{similarity:6.3f}", end="  ")
    print()

### 2.2 벡터 데이터베이스 구현
FAISS와 ChromaDB를 활용한 벡터 검색 시스템을 구현합니다.

In [None]:
@dataclass
class SearchResult:
    """검색 결과 데이터 구조"""
    chunk: DocumentChunk
    score: float
    rank: int

class VectorDatabase:
    """벡터 데이터베이스 인터페이스"""
    
    def __init__(self, embedding_generator: EmbeddingGenerator):
        self.embedding_generator = embedding_generator
        self.logger = logging.getLogger(__name__)
        self.chunks: List[DocumentChunk] = []
        self.embeddings: Optional[np.ndarray] = None
        
    def add_chunks(self, chunks: List[DocumentChunk]):
        """청크들을 데이터베이스에 추가"""
        self.logger.info(f"{len(chunks)}개 청크 추가 시작")
        start_time = time.time()
        
        # 텍스트 추출
        texts = [chunk.content for chunk in chunks]
        
        # 임베딩 생성
        new_embeddings = self.embedding_generator.generate_batch_embeddings(texts)
        
        # 기존 데이터와 합치기
        if self.embeddings is None:
            self.embeddings = new_embeddings
            self.chunks = chunks.copy()
        else:
            self.embeddings = np.vstack([self.embeddings, new_embeddings])
            self.chunks.extend(chunks)
        
        processing_time = time.time() - start_time
        self.logger.info(
            f"청크 추가 완료: 총 {len(self.chunks)}개 청크, "
            f"임베딩 형태: {self.embeddings.shape}, "
            f"처리 시간: {processing_time:.2f}초"
        )
    
    def search(self, query: str, top_k: int = 5) -> List[SearchResult]:
        """
        쿼리와 유사한 청크들을 검색
        
        Args:
            query (str): 검색 쿼리
            top_k (int): 반환할 결과 수
            
        Returns:
            List[SearchResult]: 검색 결과 목록
        """
        if self.embeddings is None or len(self.chunks) == 0:
            self.logger.warning("데이터베이스가 비어있습니다")
            return []
        
        self.logger.info(f"검색 시작: '{query[:50]}...', top_k={top_k}")
        start_time = time.time()
        
        # 쿼리 임베딩 생성
        query_embedding = self.embedding_generator.generate_embedding(query)
        
        # 모든 청크와의 유사도 계산
        similarities = []
        for i, chunk_embedding in enumerate(self.embeddings):
            similarity = self.embedding_generator.compute_similarity(
                query_embedding, chunk_embedding
            )
            similarities.append((similarity, i))
        
        # 유사도 순으로 정렬
        similarities.sort(key=lambda x: x[0], reverse=True)
        
        # 상위 k개 결과 생성
        results = []
        for rank, (score, chunk_idx) in enumerate(similarities[:top_k]):
            result = SearchResult(
                chunk=self.chunks[chunk_idx],
                score=score,
                rank=rank + 1
            )
            results.append(result)
        
        search_time = time.time() - start_time
        self.logger.info(
            f"검색 완료: {len(results)}개 결과, "
            f"최고 점수: {results[0].score:.4f}, "
            f"검색 시간: {search_time:.3f}초"
        )
        
        return results
    
    def get_stats(self) -> Dict[str, Any]:
        """데이터베이스 통계 정보"""
        if not self.chunks:
            return {"total_chunks": 0, "total_documents": 0}
        
        doc_ids = set(chunk.doc_id for chunk in self.chunks)
        
        return {
            "total_chunks": len(self.chunks),
            "total_documents": len(doc_ids),
            "embedding_dimension": self.embeddings.shape[1] if self.embeddings is not None else 0,
            "avg_chunk_length": np.mean([len(chunk.content) for chunk in self.chunks]),
            "total_characters": sum(len(chunk.content) for chunk in self.chunks)
        }

# 벡터 데이터베이스 테스트
vector_db = VectorDatabase(embedding_generator)

# 이전에 생성한 청크들을 데이터베이스에 추가
vector_db.add_chunks(chunks)

# 데이터베이스 통계
stats = vector_db.get_stats()
print(f"\n벡터 데이터베이스 통계:")
for key, value in stats.items():
    print(f"  {key}: {value}")

# 검색 테스트
test_queries = [
    "인공지능의 역사는 언제부터 시작되었나요?",
    "딥러닝과 머신러닝의 차이점은 무엇인가요?",
    "트랜스포머 아키텍처에 대해 알려주세요",
    "RAG 기술이 무엇인가요?"
]

for query in test_queries:
    print(f"\n🔍 검색 쿼리: '{query}'")
    print("-" * 60)
    
    results = vector_db.search(query, top_k=3)
    
    for result in results:
        print(f"\n순위 {result.rank} (유사도: {result.score:.4f})")
        print(f"청크 ID: {result.chunk.id}")
        print(f"내용: {result.chunk.content[:150]}...")
        print("-" * 40)