In [13]:
# ==============================================================================

# 👩‍💻 Author    : Hyelim Jo
# 🎯 Purpose   : AI 윤리성 리스크 진단 에이전트 v1.0
# 📅 Created   : 2025-10-22
# 📜 Note      : evidence_collector.ipynb

# ==============================================================================


In [14]:
# -------------------------------- Update Log ----------------------------------

# 2025-10-22 16:00 / 초기 생성 / Evidence Collector 기본 구조 구현
# 2025-10-22 16:30 / RAG 메모리 설계 / Baseline + Issue 메모리 분리
# 2025-10-22 17:00 / HuggingFace 임베딩 적용 / 경제성 개선
# 2025-10-23 09:00 / 웹 크롤링 실제 구현 / Tavily Search API를 사용하여 최신 뉴스/논문 수집
# 2025-10-23 09:30 / Baseline 쿼리 강화 / EU, OECD, UNESCO 기준 명시 및 파일 구성에 맞춰 로드 로직 명확화

# ------------------------------------------------------------------------------


In [15]:
# step1. 라이브러리 불러오기
import os
import json
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Any
from bs4 import BeautifulSoup
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain_community.document_loaders import PyMuPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI # LLM 사용을 위해 필요
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.documents import Document
from dotenv import load_dotenv
from tavily import TavilyClient

# 환경 변수 로드
load_dotenv()

print("✅ 라이브러리 불러오기 완료!")

✅ 라이브러리 불러오기 완료!


In [16]:
# step2. 설정 및 경로 정의
# 데이터 경로 설정 (agents 폴더 내에서 실행 가정)
base_dir = os.path.join("..", "data")
reference_dir = os.path.join(base_dir, "reference")
crawled_dir = os.path.join(base_dir, "crawled")
processed_dir = os.path.join(base_dir, "processed")
baseline_embed_dir = os.path.join(base_dir, "embeddings", "baseline")
issue_embed_dir = os.path.join(base_dir, "embeddings", "issue")

# 디렉토리 생성
for dir_path in [crawled_dir, processed_dir, baseline_embed_dir, issue_embed_dir]:
    os.makedirs(dir_path, exist_ok=True)

print("✅ 경로 설정 완료!")

✅ 경로 설정 완료!


In [17]:
# step3. 임베딩 모델 및 LLM 초기화
embedding_model = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2",
    model_kwargs={"device": "cpu"},
    encode_kwargs={"normalize_embeddings": True}
)
# 💡 LLM 초기화 (요약 및 평가에 사용)
try:
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    print("✅ ChatOpenAI LLM 초기화 완료!")
except Exception as e:
    print(f"⚠️ ChatOpenAI 초기화 실패: {e}")
    llm = None

print("✅ HuggingFace 임베딩 모델 초기화 완료!")



✅ ChatOpenAI LLM 초기화 완료!
✅ HuggingFace 임베딩 모델 초기화 완료!


In [18]:
# step4. Baseline 메모리 구축 (EU, OECD, UNESCO 문서)
def build_baseline_memory():
    """공식 문서 기반 Baseline 메모리 구축"""
    baseline_docs = []
    
    # PDF 파일 로드
    pdf_files = [
        "EU_AI_Act.pdf",
        "OECD_Privacy_2024.pdf", 
        "UNESCO_Ethics_2021.pdf"
    ]
    
    for pdf_file in pdf_files:
        pdf_path = os.path.join(reference_dir, pdf_file)
        
        if os.path.exists(pdf_path):
            loader = PyMuPDFLoader(pdf_path)
            docs = loader.load()
            print(f"✅ {pdf_file} 로드 완료")
            
            # 메타데이터에 문서 타입 추가 및 페이지 번호 정보 포함
            for doc in docs:
                doc.metadata["document_type"] = "baseline"
                doc.metadata["source"] = pdf_file
                doc.metadata["page"] = doc.metadata.get("page", 0) + 1 # 페이지 번호는 1부터 시작
            baseline_docs.extend(docs)
        else:
            print(f"⚠️ {pdf_file} 파일이 지정된 경로에 없습니다: {pdf_path}")
    
    if not baseline_docs:
        print("❌ Baseline 문서를 로드하지 못했습니다. RAG가 Baseline 증거를 찾지 못할 수 있습니다.")
        split_docs = [Document(page_content="No official baseline documents loaded.", metadata={"source": "N/A", "document_type": "baseline", "page": 0})]
    else:
        # 텍스트 분할
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=500,
            chunk_overlap=50
        )
        split_docs = text_splitter.split_documents(baseline_docs)
    
    # ChromaDB에 저장
    baseline_vectorstore = Chroma.from_documents(
        documents=split_docs,
        embedding=embedding_model,
        persist_directory=baseline_embed_dir
    )
    
    print(f"✅ Baseline 메모리 구축 완료 ({len(split_docs)}개 청크)")
    return baseline_vectorstore

print("✅ Baseline 메모리 함수 정의 완료")

✅ Baseline 메모리 함수 정의 완료


In [19]:
# step5. 웹 크롤링 함수 정의 (Tavily 사용)
def crawl_web_content(keywords: List[str]) -> List[Dict[str, Any]]:
    # ... (Tavily search logic remains the same)
    tavily = TavilyClient(api_key=os.environ.get("TAVILY_API_KEY"))
    crawled_data = []
    search_queries = []
    for keyword in keywords:
        search_queries.extend([
            f"AI {keyword} 윤리 이슈",
            f"AI {keyword} 편향성 문제",
            f"AI {keyword} 개인정보보호",
        ])
    unique_queries = list(set(search_queries))[:5]
    
    for query in unique_queries:
        print(f"    - Tavily 검색 중: {query}...")
        try:
            results = tavily.search(
                query=query, 
                search_depth="advanced", 
                max_results=5, 
                include_raw_content=True
            )
            for result in results.get("results", []):
                if result.get("content"):
                    crawled_data.append({
                        "title": result.get("title", "No Title"),
                        "content": result["content"],
                        "source": result.get("url", "Unknown Source"),
                        "url": result.get("url", ""),
                        "date": datetime.now().strftime("%Y-%m-%d"),
                        "category": "issue"
                    })
        except Exception as e:
            print(f"⚠️ Tavily 검색 실패 ({query}): {e}")
            continue

    # 필터링 로직 (300자 이상, 선정적 표현 제거, 중복 URL 제거)
    filtered_data = []
    for item in crawled_data:
        if len(item["content"]) >= 300:
            if not any(word in item["content"].lower() for word in ["충격", "폭로", "clickbait", "논란의", "대박"]):
                if item["url"] not in [d.get("url") for d in filtered_data]:
                    filtered_data.append(item)

    print(f"✅ 웹 크롤링 완료 ({len(filtered_data)}개 문서)")
    return filtered_data

print("✅ 웹 크롤링 함수 정의 완료")

✅ 웹 크롤링 함수 정의 완료


In [20]:
# step6. Issue 메모리 구축 (웹 크롤링 결과를 RAG에 저장)
def build_issue_memory(keywords: List[str]):
    # ... (Issue memory build logic remains the same)
    crawled_data = crawl_web_content(keywords)
    issue_docs = []
    
    for item in crawled_data:
        doc = Document(
            page_content=f"[이슈: {item['category']}] {item['title']}\n\n{item['content']}",
            metadata={
                "document_type": "issue",
                "source": item["source"],
                "url": item["url"],
                "date": item["date"],
                "category": item["category"],
                "title": item["title"]
            }
        )
        issue_docs.append(doc)
    
    if issue_docs:
        issue_vectorstore = Chroma.from_documents(
            documents=issue_docs,
            embedding=embedding_model,
            persist_directory=issue_embed_dir
        )
        print(f"✅ Issue 메모리 구축 완료 ({len(issue_docs)}개 문서)")
        return issue_vectorstore
    else:
        print("⚠️ 크롤링된 데이터가 없습니다.")
        return None

print("✅ Issue 메모리 구축 함수 정의 완료")


# 💡 신규 함수 정의: LLM을 이용한 증거 요약
def summarize_evidence_with_llm(docs: List[Document], query: str) -> List[Dict[str, Any]]:
    """검색된 Document 목록을 LLM을 사용하여 요약하고 세부 정보와 결합합니다."""
    if not llm:
        print("⚠️ LLM이 초기화되지 않아 요약을 건너뜁니다.")
        return []

    summarized_results = []
    
    summary_prompt_template = """당신은 AI 윤리 리스크 진단 전문가입니다. 다음 정보를 분석하여 한국어로 3줄 이내의 간결하고 핵심적인 요약을 제공하세요.
    이 요약은 'AI 서비스 {query}의 윤리 리스크'에 대한 근거로 사용될 것입니다.
    ---
    문서 출처: {source} ({document_type}) {chunk_info}
    문서 내용: {content}
    ---
    요약:"""
    summary_prompt = PromptTemplate(template=summary_prompt_template, input_variables=["query", "source", "document_type", "content", "chunk_info"])

    for doc in docs:
        content = doc.page_content
        source = doc.metadata.get("source", doc.metadata.get("url", "Unknown"))
        doc_type = doc.metadata.get("document_type", "Unknown")
        category = doc.metadata.get("category", "N/A")

        # 문서 타입에 따른 청크 정보 설정
        if doc_type == "baseline":
            chunk_info = f"(페이지 {doc.metadata.get('page', 'N/A')}의 내용)"
            score = 0.8
        else: # issue
            chunk_info = "(웹 기사 원문)"
            score = 0.2

        # 프롬프트 구성 및 요약 생성
        prompt_value = summary_prompt.invoke({
            "query": query,
            "source": source,
            "document_type": doc_type,
            "content": content,
            "chunk_info": chunk_info
        })
        
        try:
            # LLM 호출
            summary_response = llm.invoke(prompt_value.to_string())
            summary = summary_response.content.strip()
        except Exception as e:
            summary = f"LLM 요약 실패. Error: {e}"
        
        # Risk Assessor 에이전트에 전달할 상세 구조
        summarized_results.append({
            "category": category,
            "document_type": doc_type,
            "source": source,
            "chunk_info": chunk_info, # PDF 페이지 또는 웹 기사 여부
            "score": score,
            "summary": summary, 
            "content_excerpt": content[:300] + "...", # 원문 내용의 일부 (너무 길어지지 않도록)
            "full_content": content # Risk Assessor에서 필요할 경우를 대비하여 전체 원문도 전달
        })
        
    return summarized_results

✅ Issue 메모리 구축 함수 정의 완료


In [21]:
# step7. 증거 수집 함수 정의 (가중치 8:2 적용)
def collect_evidence(service_profile: Dict[str, Any]) -> Dict[str, Any]:
    """
    서비스 프로파일 기반 증거 수집 (Baseline 0.8 : Issue 0.2)
    - Risk Assessor에게 전달할 증거 소스 목록 및 가중치 점수, 요약 포함
    """
    
    service_name = service_profile.get("service_name", "")
    risk_categories = service_profile.get("risk_categories", [])
    service_type = service_profile.get("service_type", "")
    
    print(f"\n🔍 증거 수집 시작: {service_name}")
    
    # 메모리 구축
    baseline_vectorstore = build_baseline_memory()
    issue_vectorstore = build_issue_memory(risk_categories)
    
    evidence_results = {
        "query": service_name,
        "weights": {"baseline": 0.8, "issue": 0.2},
        "scores": {},
        "baseline_sources": [],
        "issue_sources": []
    }
    
    all_docs_to_summarize = []
    
    # 1. 각 리스크 카테고리별 증거 검색
    for category in risk_categories:
        
        # Baseline 검색 쿼리 강화
        baseline_query = f"{service_name} {category} 리스크 {service_type} (EU AI Act, OECD, UNESCO 윤리 기준)"
        issue_query = f"최신 뉴스 논문 AI {service_name} {category} 문제"
        
        print(f"\n   📊 {category.upper()} 리스크 검색 중...")
        
        # Baseline 검색
        baseline_docs = baseline_vectorstore.similarity_search(baseline_query, k=3)
        
        # Issue 검색
        issue_docs = []
        if issue_vectorstore:
            issue_docs = issue_vectorstore.similarity_search(issue_query, k=2)
            
        # 검색된 문서를 요약 대상 리스트에 추가 (카테고리 메타데이터 부여)
        for doc in baseline_docs:
            doc.metadata['category'] = category
            all_docs_to_summarize.append(doc)
        for doc in issue_docs:
            doc.metadata['category'] = category
            all_docs_to_summarize.append(doc)
            
        # 종합 점수 계산 (참고용)
        baseline_weight = 0.8
        issue_weight = 0.2 if issue_docs else 0.0
        total_score = (len(baseline_docs) > 0) * baseline_weight + (len(issue_docs) > 0) * issue_weight
        evidence_results["scores"][category] = total_score
        
        print(f" - 검색된 Baseline 청크: {len(baseline_docs)}개")
        print(f" - 검색된 Issue 문서: {len(issue_docs)}개")

    print("\n📝 검색된 증거들을 LLM을 사용하여 요약 중...")
    
    # 2. 통합 요약 및 데이터 구조화
    summarized_evidences = summarize_evidence_with_llm(all_docs_to_summarize, service_name)
    
    # 3. 최종 결과 리스트에 추가
    for evidence in summarized_evidences:
        if evidence['document_type'] == 'baseline':
            evidence_results["baseline_sources"].append(evidence)
        elif evidence['document_type'] == 'issue':
            evidence_results["issue_sources"].append(evidence)
    
    print(f"\n✅ 증거 수집 및 요약 완료!")
    return evidence_results

print("✅ 증거 수집 함수 정의 완료")

✅ 증거 수집 함수 정의 완료


In [22]:
# step8. 테스트 실행
print("\n" + "="*60)
print("🔍 Evidence Collector 시작...")
print("="*60)

# Service Profiler에서 받은 결과 (예시)
test_service_profile = {
    "service_name": "이력서 분석 추천 시스템",
    "service_type": "recruitment system", 
    "description": "채용 지원자의 이력서를 AI로 분석하여 적합한 후보자를 추천하는 시스템입니다.",
    "risk_categories": ["bias", "privacy", "transparency"]
}

print(f"\n📝 분석할 서비스: {test_service_profile['description']}...")

# 증거 수집 실행
evidence_result = collect_evidence(test_service_profile)

print("\n" + "="*60)
print("📊 증거 수집 결과 요약 및 다음 에이전트 전달 내용")
print("="*60)
print(f"  - 서비스명: {evidence_result['query']}")
print(f"  - 가중치: Baseline {evidence_result['weights']['baseline']} : Issue {evidence_result['weights']['issue']}")
print(f"  - 수집된 리스크 카테고리: {list(evidence_result['scores'].keys())}")

print("\n------------------------------------------------------------")
print("🚨 Risk Assessor에 전달되는 데이터 구조 요약 (첫 번째 증거 예시)")
print("------------------------------------------------------------")

# 전달될 데이터 구조 예시 출력 (첫 번째 Baseline 소스와 첫 번째 Issue 소스)
if evidence_result['baseline_sources']:
    b_src = evidence_result['baseline_sources'][0]
    print("\n[Baseline 근거 예시]")
    print(f"  - 리스크 카테고리: {b_src['category'].upper()}")
    print(f"  - 출처: {b_src['source']} {b_src['chunk_info']}")
    print(f"  - **요약 (핵심 근거):** {b_src['summary']}")
    print(f"  - 원문 내용 (일부): {b_src['content_excerpt']}")

if evidence_result['issue_sources']:
    i_src = evidence_result['issue_sources'][0]
    print("\n[Issue 근거 예시]")
    print(f"  - 리스크 카테고리: {i_src['category'].upper()}")
    print(f"  - 출처: {i_src['source']} ({i_src.get('url', 'N/A')}) {i_src['chunk_info']}")
    print(f"  - **요약 (사회적 반응):** {i_src['summary']}")
    print(f"  - 원문 내용 (일부): {i_src['content_excerpt']}")

print("\n------------------------------------------------------------")
print(f"🔗 최종적으로 {len(evidence_result['baseline_sources']) + len(evidence_result['issue_sources'])}개의 상세 증거가 Risk Assessor로 전달됩니다.")
print("="*60)


🔍 Evidence Collector 시작...

📝 분석할 서비스: 채용 지원자의 이력서를 AI로 분석하여 적합한 후보자를 추천하는 시스템입니다....

🔍 증거 수집 시작: 이력서 분석 추천 시스템


✅ EU_AI_Act.pdf 로드 완료
✅ OECD_Privacy_2024.pdf 로드 완료
✅ UNESCO_Ethics_2021.pdf 로드 완료


Failed to send telemetry event ClientStartEvent: capture() takes 1 positional argument but 3 were given
Failed to send telemetry event ClientCreateCollectionEvent: capture() takes 1 positional argument but 3 were given


✅ Baseline 메모리 구축 완료 (2075개 청크)
    - Tavily 검색 중: AI bias 편향성 문제...
    - Tavily 검색 중: AI privacy 편향성 문제...
    - Tavily 검색 중: AI transparency 개인정보보호...
    - Tavily 검색 중: AI privacy 윤리 이슈...
    - Tavily 검색 중: AI bias 윤리 이슈...


Failed to send telemetry event ClientStartEvent: capture() takes 1 positional argument but 3 were given
Failed to send telemetry event ClientCreateCollectionEvent: capture() takes 1 positional argument but 3 were given


✅ 웹 크롤링 완료 (23개 문서)
✅ Issue 메모리 구축 완료 (23개 문서)

   📊 BIAS 리스크 검색 중...
 - 검색된 Baseline 청크: 3개
 - 검색된 Issue 문서: 2개

   📊 PRIVACY 리스크 검색 중...
 - 검색된 Baseline 청크: 3개
 - 검색된 Issue 문서: 2개

   📊 TRANSPARENCY 리스크 검색 중...
 - 검색된 Baseline 청크: 3개
 - 검색된 Issue 문서: 2개

📝 검색된 증거들을 LLM을 사용하여 요약 중...

✅ 증거 수집 및 요약 완료!

📊 증거 수집 결과 요약 및 다음 에이전트 전달 내용
  - 서비스명: 이력서 분석 추천 시스템
  - 가중치: Baseline 0.8 : Issue 0.2
  - 수집된 리스크 카테고리: ['bias', 'privacy', 'transparency']

------------------------------------------------------------
🚨 Risk Assessor에 전달되는 데이터 구조 요약 (첫 번째 증거 예시)
------------------------------------------------------------

[Baseline 근거 예시]
  - 리스크 카테고리: BIAS
  - 출처: UNESCO_Ethics_2021.pdf (페이지 40의 내용)
  - **요약 (핵심 근거):** AI 서비스 이력서 분석 추천 시스템은 UNESCO의 정책 권고사항에 따라 윤리적 기준을 준수해야 하며, 이를 위해 경험 공유 메커니즘과 AI 규제 샌드박스가 필요하다. 이러한 도구들은 AI 관련 주체들이 윤리 리스크를 평가하고 개선할 수 있도록 지원한다. 따라서 시스템의 설계와 운영에서 윤리적 고려가 필수적이다.
  - 원문 내용 (일부): across UNESCO’s areas of competence, an experience-
sharing mechanism, AI regulatory sandbo