In [1]:
import pdfplumber
import os

def convert_pdf_single_file(input_path, output_directory):
    """
    pdfplumber를 사용하여 단일 PDF 파일의 모든 텍스트를 추출하고 텍스트 파일로 저장합니다.

    Args:
        input_path (str): 변환할 PDF 파일의 전체 경로.
        output_directory (str): 변환된 텍스트 파일을 저장할 디렉터리.
    """
    # 출력 디렉터리가 없으면 생성
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)
        print(f"출력 디렉터리가 없어 생성했습니다: {output_directory}")

    # 파일명 설정 (확장자를 .txt로 변경)
    file_name = os.path.basename(input_path)
    base_name, _ = os.path.splitext(file_name)
    output_path = os.path.join(output_directory, f'{base_name}.txt')

    full_text = ""

    try:
        with pdfplumber.open(input_path) as pdf:
            print(f"--- 변환 중: {file_name} ({pdf.pages} 페이지) ---")
            for page in pdf.pages:
                # 페이지의 텍스트 추출. 페이지 번호를 포함하여 구조화 가능
                extracted_text = page.extract_text()
                if extracted_text:
                    full_text += extracted_text + "\n"  # 페이지 간 줄 바꿈 추가

        # 추출된 전체 텍스트를 파일로 저장
        with open(output_path, "w", encoding="utf-8") as f:
            f.write(full_text)

        print(f"✅ 성공: {file_name} -> {output_path}")

    except Exception as e:
        print(f"❌ 오류 발생 ({file_name}): {e}")

# --- 실행 예시 ---
# 변환할 PDF 파일 경로
input_file = '../data/raw/files/대전대학교_대전대학교 2024학년도 다층적 융합 학습경험 플랫폼(MILE) 전.pdf'
# 텍스트 파일을 저장할 디렉터리
output_dir = '../data/processed/datapreprocessingbjs(pdfplumber)/text_single_file'

# 함수 호출
convert_pdf_single_file(input_file, output_dir)

출력 디렉터리가 없어 생성했습니다: ../data/processed/datapreprocessingbjs(pdfplumber)/text_single_file
--- 변환 중: 대전대학교_대전대학교 2024학년도 다층적 융합 학습경험 플랫폼(MILE) 전.pdf ([<Page:1>, <Page:2>, <Page:3>, <Page:4>, <Page:5>, <Page:6>, <Page:7>, <Page:8>, <Page:9>, <Page:10>, <Page:11>, <Page:12>, <Page:13>, <Page:14>, <Page:15>, <Page:16>, <Page:17>, <Page:18>, <Page:19>, <Page:20>, <Page:21>, <Page:22>, <Page:23>, <Page:24>, <Page:25>, <Page:26>, <Page:27>, <Page:28>, <Page:29>, <Page:30>, <Page:31>, <Page:32>, <Page:33>, <Page:34>, <Page:35>, <Page:36>, <Page:37>, <Page:38>, <Page:39>, <Page:40>, <Page:41>, <Page:42>, <Page:43>, <Page:44>, <Page:45>, <Page:46>, <Page:47>, <Page:48>, <Page:49>, <Page:50>, <Page:51>, <Page:52>, <Page:53>, <Page:54>, <Page:55>, <Page:56>, <Page:57>, <Page:58>, <Page:59>, <Page:60>, <Page:61>, <Page:62>, <Page:63>, <Page:64>, <Page:65>, <Page:66>, <Page:67>, <Page:68>, <Page:69>, <Page:70>, <Page:71>, <Page:72>, <Page:73>, <Page:74>, <Page:75>] 페이지) ---
✅ 성공: 대전대학교_대전대학교 2024학년도 다

In [2]:
import pdfplumber
import os

def convert_all_pdfs_in_folder(input_directory, output_directory):
    """
    지정된 디렉터리 내의 모든 PDF 파일을 변환합니다.

    Args:
        input_directory (str): PDF 파일들이 있는 디렉터리 경로.
        output_directory (str): 변환된 텍스트 파일을 저장할 디렉터리.
    """
    # 출력 디렉터리가 없으면 생성
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)
        print(f"출력 디렉터리가 없어 생성했습니다: {output_directory}")

    # 입력 디렉터리 내 모든 파일을 순회하며 PDF 파일만 찾기
    pdf_files = [f for f in os.listdir(input_directory) if f.lower().endswith(".pdf")]
    
    total_files = len(pdf_files)
    success_count = 0
    failed_files = []

    print(f"\n--- PDF 파일 일괄 변환 시작 (총 {total_files}개) ---")

    for pdf_file in pdf_files:
        input_path = os.path.join(input_directory, pdf_file)
        base_name, _ = os.path.splitext(pdf_file)
        output_path = os.path.join(output_directory, f'{base_name}.txt')
        
        full_text = ""

        try:
            with pdfplumber.open(input_path) as pdf:
                for page in pdf.pages:
                    extracted_text = page.extract_text()
                    if extracted_text:
                        full_text += extracted_text + "\n"

            with open(output_path, "w", encoding="utf-8") as f:
                f.write(full_text)

            print(f"✅ 성공: {pdf_file} -> {os.path.relpath(output_path, start='.')}")
            success_count += 1

        except Exception as e:
            print(f"❌ 오류 발생 ({pdf_file}): {e}")
            failed_files.append(pdf_file)

    print("\n--- 변환 작업 완료 ---")
    print(f"성공적으로 변환된 파일 수: {success_count}")
    if failed_files:
        print(f"오류가 발생한 파일 목록: {len(failed_files)}개")
        for f in failed_files:
            print(f"- {f}")

# --- 실행 예시 ---
# PDF 파일들이 있는 입력 폴더 경로
input_folder = '../data/raw/files'
# 변환된 텍스트 파일을 저장할 출력 디렉터리
output_folder = '../data/processed/datapreprocessingbjs(pdfplumber)/text_all_files'

# 함수 호출
convert_all_pdfs_in_folder(input_folder, output_folder)

출력 디렉터리가 없어 생성했습니다: ../data/processed/datapreprocessingbjs(pdfplumber)/text_all_files

--- PDF 파일 일괄 변환 시작 (총 5개) ---
✅ 성공: 기초과학연구원_2025년도 중이온가속기용 극저온시스템 운전 용역.pdf -> ../data/processed/datapreprocessingbjs(pdfplumber)/text_all_files/기초과학연구원_2025년도 중이온가속기용 극저온시스템 운전 용역.txt
✅ 성공: 고려대학교_차세대 포털·학사 정보시스템 구축사업.pdf -> ../data/processed/datapreprocessingbjs(pdfplumber)/text_all_files/고려대학교_차세대 포털·학사 정보시스템 구축사업.txt
✅ 성공: 서울특별시_2024년 지도정보 플랫폼 및 전문활용 연계 시스템 고도화 용.pdf -> ../data/processed/datapreprocessingbjs(pdfplumber)/text_all_files/서울특별시_2024년 지도정보 플랫폼 및 전문활용 연계 시스템 고도화 용.txt
✅ 성공: 서울시립대학교_[사전공개] 학업성취도 다차원 종단분석 통합시스템 1차.pdf -> ../data/processed/datapreprocessingbjs(pdfplumber)/text_all_files/서울시립대학교_[사전공개] 학업성취도 다차원 종단분석 통합시스템 1차.txt
✅ 성공: 대전대학교_대전대학교 2024학년도 다층적 융합 학습경험 플랫폼(MILE) 전.pdf -> ../data/processed/datapreprocessingbjs(pdfplumber)/text_all_files/대전대학교_대전대학교 2024학년도 다층적 융합 학습경험 플랫폼(MILE) 전.txt

--- 변환 작업 완료 ---
성공적으로 변환된 파일 수: 5


In [8]:
import pdfplumber
import os
import re
from typing import List, Dict, Any

def convert_table_to_natural_language(table_rows: List[List[str]]) -> str:
    """
    pdfplumber에서 추출한 표 데이터를 자연어 형태로 변환.
    NoneType 오류를 방지하는 로직이 추가되었습니다.
    """
    if not table_rows or len(table_rows) < 2:
        return ""
    
    # 첫 번째 행을 헤더로 가정 (NoneType 방지)
    headers = [h if h is not None else "" for h in table_rows[0]]
    content_rows = table_rows[1:]
    
    natural_text = []
    
    # '월' 또는 '일정'이 포함된 헤더가 있을 경우 일정표로 간주
    if any('월' in h for h in headers) or any('일정' in h for h in headers):
        natural_text.append("다음은 추진일정표입니다:")
        for row in content_rows:
            row = [cell if cell is not None else "" for cell in row]
            if len(row) > 1 and row[0].strip():
                task = row[0].strip()
                schedule_info = []
                for i, cell in enumerate(row[1:], 1):
                    if cell.strip() and i < len(headers):
                        header = headers[i]
                        schedule_info.append(f"{header}에 {task}")
                if schedule_info:
                    natural_text.append(". ".join(schedule_info))
    else:
        # 일반 표 처리
        for row in content_rows:
            row = [cell if cell is not None else "" for cell in row]
            if len(row) >= len(headers):
                row_text = []
                for i, (header, cell) in enumerate(zip(headers, row)):
                    if cell and cell.strip():
                        row_text.append(f"{header}: {cell.strip()}")
                if row_text:
                    natural_text.append(", ".join(row_text))
    
    return ". ".join(natural_text)

def extract_text_and_tables(file_path: str) -> Dict[str, Any]:
    """
    pdfplumber를 사용하여 PDF 파일에서 텍스트와 표를 추출하고 구조화합니다.
    NoneType 오류를 방지하는 로직이 추가되었습니다.
    """
    print(f"PDF 파일 파싱 시작: {file_path}")
    
    parsed_data = {
        'sections': [],
        'tables': [],
        'metadata': {}
    }
    
    try:
        with pdfplumber.open(file_path) as pdf:
            parsed_data['metadata']['total_pages'] = len(pdf.pages)
            print(f"총 {len(pdf.pages)} 페이지 발견")

            for page_idx, page in enumerate(pdf.pages):
                # 페이지 내 모든 표 추출
                page_tables = page.extract_tables()
                
                # 표 데이터는 별도로 저장
                for i, table in enumerate(page_tables):
                    table_data = {
                        'id': f"table_{page_idx}_{i}",
                        'type': 'table',
                        'rows': table,
                        'raw_content': convert_table_to_natural_language(table),
                        'metadata': { 'page': page_idx + 1 }
                    }
                    if table_data['raw_content']:
                        parsed_data['tables'].append(table_data)

                # 페이지 내 텍스트 추출 (NoneType 방지)
                page_text_with_layout = page.extract_text(x_tolerance=2, y_tolerance=2, layout=True)
                if not page_text_with_layout:
                    continue

                # 텍스트를 문단 단위로 분리
                paragraphs = [para for para in page_text_with_layout.split('\n\n') if para.strip()]
                for i, para in enumerate(paragraphs):
                    parsed_data['sections'].append({
                        'id': f"para_{page_idx}_{i}",
                        'type': 'paragraph',
                        'content': para.strip(),
                        'metadata': { 'page': page_idx + 1 }
                    })

    except Exception as e:
        print(f"PDF 파싱 중 오류 발생: {e}")
    
    parsed_data['metadata']['total_paragraphs'] = len(parsed_data['sections'])
    parsed_data['metadata']['total_tables'] = len(parsed_data['tables'])
    print(f"파싱 완료 - 문단: {len(parsed_data['sections'])}, 표: {len(parsed_data['tables'])}")
    
    return parsed_data

### **청크 생성 및 분석 함수**

def create_structured_chunks(parsed_data: Dict[str, Any], max_chunk_size: int = 1000) -> List[Dict[str, Any]]:
    chunks = []
    
    print("청킹 작업 시작...")
    
    for table in parsed_data['tables']:
        table_chunk = create_table_chunk(table)
        if table_chunk:
            chunks.append(table_chunk)
    
    paragraph_chunks = create_paragraph_chunks(parsed_data['sections'], max_chunk_size)
    chunks.extend(paragraph_chunks)
    
    for i, chunk in enumerate(chunks):
        chunk['chunk_id'] = f"chunk_{i:04d}"
    
    print(f"청킹 완료 - 총 {len(chunks)}개 청크 생성")
    return chunks

def create_table_chunk(table_data: Dict[str, Any]) -> Dict[str, Any]:
    if not table_data.get('raw_content') or not table_data['raw_content'].strip():
        return None
    
    table_type = identify_table_type(table_data)
    
    chunk = {
        'type': 'table',
        'subtype': table_type,
        'content': table_data['raw_content'],
        'original_rows': len(table_data.get('rows', [])),
        'metadata': {
            'table_id': table_data.get('id'),
            'source': 'pdf_table_extraction',
            'priority': get_table_priority(table_type)
        }
    }
    return chunk

def identify_table_type(table_data: Dict[str, Any]) -> str:
    content = table_data.get('raw_content', '').lower()
    rows = table_data.get('rows', [])
    
    if rows and rows[0]:
        header = ' '.join(cell for cell in rows[0] if cell).lower()
        if any(keyword in header for keyword in ['월', '일정', '추진']):
            return 'schedule'
        if any(keyword in header for keyword in ['예산', '비용', '금액', '원']):
            return 'budget'
        if any(keyword in header for keyword in ['담당', '역할', '조직']):
            return 'organization'
    
    if any(keyword in content for keyword in ['일정', '월', '추진']):
        return 'schedule'
    elif any(keyword in content for keyword in ['예산', '비용', '천원']):
        return 'budget'
    else:
        return 'general'

def get_table_priority(table_type: str) -> int:
    priority_map = {
        'schedule': 10,
        'budget': 8,
        'organization': 6,
        'general': 4
    }
    return priority_map.get(table_type, 4)

def create_paragraph_chunks(sections: List[Dict[str, Any]], max_chunk_size: int) -> List[Dict[str, Any]]:
    chunks = []
    current_chunk = []
    current_size = 0
    
    for section in sections:
        content = section.get('content', '')
        if not content.strip():
            continue
        
        estimated_tokens = len(content) * 0.7
        
        if current_size + estimated_tokens <= max_chunk_size:
            current_chunk.append(content)
            current_size += estimated_tokens
        else:
            if current_chunk:
                chunks.append(create_paragraph_chunk(current_chunk))
            
            if estimated_tokens > max_chunk_size:
                split_chunks = split_large_paragraph(content, max_chunk_size)
                chunks.extend(split_chunks)
                current_chunk = []
                current_size = 0
            else:
                current_chunk = [content]
                current_size = estimated_tokens
    
    if current_chunk:
        chunks.append(create_paragraph_chunk(current_chunk))
    
    return chunks

def create_paragraph_chunk(content_list: List[str]) -> Dict[str, Any]:
    combined_content = ' '.join(content_list)
    paragraph_type = identify_paragraph_type(combined_content)
    
    chunk = {
        'type': 'paragraph',
        'subtype': paragraph_type,
        'content': combined_content,
        'paragraph_count': len(content_list),
        'metadata': {
            'source': 'pdf_paragraph_extraction',
            'priority': get_paragraph_priority(paragraph_type)
        }
    }
    return chunk

def identify_paragraph_type(content: str) -> str:
    content_lower = content.lower()
    
    if any(keyword in content_lower for keyword in ['제', '장', '절', '항']):
        return 'header'
    if any(keyword in content_lower for keyword in ['목적', '개요', '배경']):
        return 'overview'
    if any(keyword in content_lower for keyword in ['결론', '요약', '종합']):
        return 'conclusion'
    if any(keyword in content_lower for keyword in ['방법', '절차', '과정']):
        return 'methodology'
    
    return 'general'

def get_paragraph_priority(paragraph_type: str) -> int:
    priority_map = {
        'overview': 9,
        'conclusion': 8,
        'methodology': 7,
        'header': 6,
        'general': 5
    }
    return priority_map.get(paragraph_type, 5)

def split_large_paragraph(content: str, max_chunk_size: int) -> List[Dict[str, Any]]:
    chunks = []
    sentences = re.split(r'[.!?]\s+', content)
    
    current_chunk = []
    current_size = 0
    
    for sentence in sentences:
        estimated_tokens = len(sentence) * 0.7
        
        if current_size + estimated_tokens <= max_chunk_size:
            current_chunk.append(sentence)
            current_size += estimated_tokens
        else:
            if current_chunk:
                chunk_content = '. '.join(current_chunk) + '.'
                chunks.append({
                    'type': 'paragraph',
                    'subtype': 'split_general',
                    'content': chunk_content,
                    'paragraph_count': 1,
                    'metadata': {
                        'source': 'paragraph_split',
                        'priority': 5
                    }
                })
            current_chunk = [sentence]
            current_size = estimated_tokens
    
    if current_chunk:
        chunk_content = '. '.join(current_chunk) + '.'
        chunks.append({
            'type': 'paragraph',
            'subtype': 'split_general',
            'content': chunk_content,
            'paragraph_count': 1,
            'metadata': {
                'source': 'paragraph_split',
                'priority': 5
            }
        })
    return chunks

def analyze_chunks(chunks: List[Dict[str, Any]]) -> None:
    print("\n=== 청크 분석 결과 ===")
    
    type_counts = {}
    subtype_counts = {}
    
    for chunk in chunks:
        chunk_type = chunk['type']
        chunk_subtype = chunk['subtype']
        
        type_counts[chunk_type] = type_counts.get(chunk_type, 0) + 1
        subtype_counts[chunk_subtype] = subtype_counts.get(chunk_subtype, 0) + 1
    
    print("타입별 분포:")
    for chunk_type, count in type_counts.items():
        print(f"  {chunk_type}: {count}")
    
    print("\n세부 타입별 분포:")
    for subtype, count in subtype_counts.items():
        print(f"  {subtype}: {count}")
    
    priority_counts = {}
    for chunk in chunks:
        priority = chunk['metadata']['priority']
        priority_counts[priority] = priority_counts.get(priority, 0) + 1
    
    print("\n우선순위별 분포:")
    for priority in sorted(priority_counts.keys(), reverse=True):
        print(f"  우선순위 {priority}: {priority_counts[priority]}개")

def main_pdf_processing():
    pdf_file_path = '../data/raw/files/대전대학교_대전대학교 2024학년도 다층적 융합 학습경험 플랫폼(MILE) 전.pdf'
    
    if not os.path.exists(pdf_file_path):
        print(f"파일을 찾을 수 없습니다: {pdf_file_path}")
        return
        
    parsed_pdf_data = extract_text_and_tables(pdf_file_path)
    pdf_chunks = create_structured_chunks(parsed_pdf_data)
    
    analyze_chunks(pdf_chunks)
    
    print("\n=== 샘플 청크 ===")
    table_chunks = [c for c in pdf_chunks if c.get('type') == 'table']
    if table_chunks:
        print("첫 번째 표 청크:")
        sample_table = table_chunks[0]
        print(f"  타입: {sample_table['type']}-{sample_table['subtype']}")
        print(f"  내용: {sample_table['content'][:200]}...")
        print(f"  우선순위: {sample_table['metadata']['priority']}")
    
    paragraph_chunks = [c for c in pdf_chunks if c['type'] == 'paragraph']
    if paragraph_chunks:
        print("\n첫 번째 문단 청크:")
        sample_paragraph = paragraph_chunks[0]
        print(f"  타입: {sample_paragraph['type']}-{sample_paragraph['subtype']}")
        print(f"  내용: {sample_paragraph['content'][:200]}...")
        print(f"  문단 수: {sample_paragraph['paragraph_count']}")

if __name__ == "__main__":
    main_pdf_processing()

PDF 파일 파싱 시작: ../data/raw/files/대전대학교_대전대학교 2024학년도 다층적 융합 학습경험 플랫폼(MILE) 전.pdf
총 75 페이지 발견
파싱 완료 - 문단: 75, 표: 51
청킹 작업 시작...
청킹 완료 - 총 254개 청크 생성

=== 청크 분석 결과 ===
타입별 분포:
  table: 51
  paragraph: 203

세부 타입별 분포:
  organization: 1
  general: 30
  budget: 4
  schedule: 16
  split_general: 202
  header: 1

우선순위별 분포:
  우선순위 10: 16개
  우선순위 8: 4개
  우선순위 6: 2개
  우선순위 5: 202개
  우선순위 4: 30개

=== 샘플 청크 ===
첫 번째 표 청크:
  타입: table-organization
  내용: 문의: 입찰, 소속 / 담당자: 총무팀 이병석, 전화: 042-280-2163. 문의: 사업, 소속 / 담당자: 대학교육혁신원 교수학습개발센터
홍나래, 전화: 042-280-4036...
  우선순위: 6

첫 번째 문단 청크:
  타입: paragraph-split_general
  내용: (재공고)대전대학교                    다층적         융합      학습경험                  
                                                                                  
                                            ...
  문단 수: 1


In [10]:
import json

# 최종 청크 데이터를 JSON 파일로 저장하는 함수
def save_chunks_to_json(chunks_data: List[Dict], output_file_path: str):
    """
    모든 청크 데이터를 하나의 JSON 파일로 저장합니다.
    """
    try:
        with open(output_file_path, 'w', encoding='utf-8') as f:
            json.dump(chunks_data, f, ensure_ascii=False, indent=4)
        print(f"✅ 모든 청크 데이터를 {output_file_path}에 성공적으로 저장했습니다.")
    except Exception as e:
        print(f"❌ JSON 파일 저장 중 오류가 발생했습니다: {e}")

# 최종 실행 함수: 모든 PDF 파일을 처리하고 JSON으로 저장
def process_all_pdfs_and_save_json(input_dir, output_dir, json_file_name="all_pdf_chunks.json"):
    """
    지정된 입력 디렉터리의 모든 PDF 파일을 파싱하고 청킹한 후,
    결과를 단일 JSON 파일로 저장합니다.
    """
    # 출력 디렉터리 생성
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
        print(f"출력 디렉터리가 없어 생성했습니다: {output_dir}")

    all_chunks = []
    file_list = [f for f in os.listdir(input_dir) if f.lower().endswith('.pdf')]
    total_files = len(file_list)
    
    print(f"\n--- 총 {total_files}개 PDF 파일 일괄 처리 시작 ---")

    for i, file_name in enumerate(file_list):
        input_path = os.path.join(input_dir, file_name)
        
        print(f"\n[{i+1}/{total_files}] 파일 처리 중: {file_name}")
        
        # PDF 파일 파싱 (오류 방지 로직 적용)
        parsed_data = extract_text_and_tables(input_path)
        
        # 파싱된 데이터를 청크로 변환
        if parsed_data:
            chunks_for_file = create_structured_chunks(parsed_data)
            
            # 각 청크에 파일명 메타데이터 추가
            for chunk in chunks_for_file:
                chunk['metadata']['source_file'] = file_name
            
            all_chunks.extend(chunks_for_file)
            print(f"✅ {file_name}에서 {len(chunks_for_file)}개 청크 생성 완료.")
    
    final_output_path = os.path.join(output_dir, json_file_name)
    save_chunks_to_json(all_chunks, final_output_path)
    print(f"\n--- 모든 파일 처리 및 JSON 저장 완료 ---")
    print(f"총 {len(all_chunks)}개 청크가 생성되어 {final_output_path}에 저장되었습니다.")

# --- 실행 예시 ---
input_folder = '../data/raw/files'
output_folder = '../data/processed/datapreprocessingbjs(pdfplumber)'

process_all_pdfs_and_save_json(input_folder, output_folder)


--- 총 5개 PDF 파일 일괄 처리 시작 ---

[1/5] 파일 처리 중: 기초과학연구원_2025년도 중이온가속기용 극저온시스템 운전 용역.pdf
PDF 파일 파싱 시작: ../data/raw/files/기초과학연구원_2025년도 중이온가속기용 극저온시스템 운전 용역.pdf
총 49 페이지 발견
파싱 완료 - 문단: 49, 표: 52
청킹 작업 시작...
청킹 완료 - 총 192개 청크 생성
✅ 기초과학연구원_2025년도 중이온가속기용 극저온시스템 운전 용역.pdf에서 192개 청크 생성 완료.

[2/5] 파일 처리 중: 고려대학교_차세대 포털·학사 정보시스템 구축사업.pdf
PDF 파일 파싱 시작: ../data/raw/files/고려대학교_차세대 포털·학사 정보시스템 구축사업.pdf
총 297 페이지 발견
파싱 완료 - 문단: 297, 표: 377
청킹 작업 시작...
청킹 완료 - 총 808개 청크 생성
✅ 고려대학교_차세대 포털·학사 정보시스템 구축사업.pdf에서 808개 청크 생성 완료.

[3/5] 파일 처리 중: 서울특별시_2024년 지도정보 플랫폼 및 전문활용 연계 시스템 고도화 용.pdf
PDF 파일 파싱 시작: ../data/raw/files/서울특별시_2024년 지도정보 플랫폼 및 전문활용 연계 시스템 고도화 용.pdf
총 75 페이지 발견
파싱 완료 - 문단: 75, 표: 151
청킹 작업 시작...
청킹 완료 - 총 405개 청크 생성
✅ 서울특별시_2024년 지도정보 플랫폼 및 전문활용 연계 시스템 고도화 용.pdf에서 405개 청크 생성 완료.

[4/5] 파일 처리 중: 서울시립대학교_[사전공개] 학업성취도 다차원 종단분석 통합시스템 1차.pdf
PDF 파일 파싱 시작: ../data/raw/files/서울시립대학교_[사전공개] 학업성취도 다차원 종단분석 통합시스템 1차.pdf
총 149 페이지 발견
파싱 완료 - 문단: 149, 표: 163
청킹 작업 시작...
청킹 완료 - 총 563개 청크 생성
✅ 서울시립대학교_[사

In [112]:
import chromadb
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
import json
import os

def load_chunks_from_json(file_path):
    """JSON 파일에서 청크 데이터를 불러옵니다."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        print(f"✅ {file_path}에서 {len(data)}개의 청크를 성공적으로 불러왔습니다.")
        return data
    except FileNotFoundError:
        print(f"❌ 오류: 파일을 찾을 수 없습니다: {file_path}")
        return None
    except json.JSONDecodeError:
        print(f"❌ 오류: JSON 디코딩에 실패했습니다. 파일 형식을 확인해주세요.")
        return None

# JSON 파일 경로 설정 및 데이터 로드
json_file_path = '../data/processed/datapreprocessingbjs(pdfplumber)/all_pdf_chunks.json'
chunks = load_chunks_from_json(json_file_path)

if chunks is None:
    # 데이터 로드 실패 시, 스크립트 종료
    exit()

✅ ../data/processed/datapreprocessingbjs(pdfplumber)/all_pdf_chunks.json에서 2222개의 청크를 성공적으로 불러왔습니다.


In [113]:
# chunks 리스트의 각 청크를 순회하며 청크 ID 고유화
for chunk in chunks:
    # 메타데이터에서 원본 파일 이름 가져오기, 없으면 "unknown" 사용
    file_name = chunk['metadata'].get("source_file", "unknown")
    
    # 기존 chunk_id에 파일 이름을 붙여서 고유 ID 생성
    chunk['chunk_id'] = f"{file_name}_{chunk['chunk_id']}"

# ChromaDB 클라이언트 초기화
client = chromadb.Client()

# PersistentClient 적용
db_path = "../data/processed/datapreprocessingbjs(pdfplumber)/chromaDB"  # 🔹 DB 저장 경로
client = chromadb.PersistentClient(path=db_path)  # 🔹 메모리 대신 디스크 기반 DB 사용

# 임베딩 모델 지정
model_name = "snunlp/KR-SBERT-V40K-klueNLI-augSTS"
try:
    model = SentenceTransformer(model_name)
    print(f"✅ 임베딩 모델 '{model_name}'을 성공적으로 로드했습니다.")
except OSError as e:
    print(f"❌ 모델 로드 중 오류 발생: {e}")
    exit()

def get_or_create_collection(collection_name):
    """컬렉션을 가져오거나 새로 생성합니다."""
    try:
        collection = client.get_collection(name=collection_name)
        print(f"✅ 기존 컬렉션 '{collection_name}'을 불러왔습니다.")
        return collection
    except Exception:
        collection = client.create_collection(name=collection_name)
        print(f"✨ 새로운 컬렉션 '{collection_name}'을 생성했습니다.")
        return collection

# ChromaDB 컬렉션 설정
collection_name = "pdf_rag_collection_snunlp"
collection = get_or_create_collection(collection_name)

# 기존 데이터가 있을 경우 삭제 (테스트용)
if collection.count() > 0:
    print(f"⚠️ 기존 데이터 {collection.count()}개를 삭제합니다.")
    client.delete_collection(collection_name)
    collection = client.create_collection(name=collection_name)

✅ 임베딩 모델 'snunlp/KR-SBERT-V40K-klueNLI-augSTS'을 성공적으로 로드했습니다.
✨ 새로운 컬렉션 'pdf_rag_collection_snunlp'을 생성했습니다.


In [114]:
def embed_and_save_to_chroma(chunks, collection, model):
    """청크 데이터를 임베딩하여 ChromaDB에 저장합니다."""
    print("🚀 청크 임베딩 및 ChromaDB 저장 시작...")
    
    # 데이터를 ChromaDB에 맞게 변환
    documents = [chunk['content'] for chunk in chunks]
    metadatas = [chunk['metadata'] for chunk in chunks]
    ids = [chunk['chunk_id'] for chunk in chunks]
    
    # 50개씩 일괄 처리
    batch_size = 50
    total_chunks = len(documents)
    
    for i in tqdm(range(0, total_chunks, batch_size), desc="ChromaDB 저장 중"):
        batch_documents = documents[i:i + batch_size]
        batch_metadatas = metadatas[i:i + batch_size]
        batch_ids = ids[i:i + batch_size]
        
        # 모델을 사용하여 임베딩 벡터 생성
        embeddings = model.encode(batch_documents, show_progress_bar=False).tolist()
        
        # 임베딩과 함께 데이터 저장
        collection.add(
            embeddings=embeddings,
            documents=batch_documents,
            metadatas=batch_metadatas,
            ids=batch_ids
        )
    
    print(f"✅ 총 {collection.count()}개의 청크가 ChromaDB에 성공적으로 저장되었습니다.")

# 최종 실행 함수 호출
embed_and_save_to_chroma(chunks, collection, model)

🚀 청크 임베딩 및 ChromaDB 저장 시작...


ChromaDB 저장 중: 100%|██████████| 45/45 [00:18<00:00,  2.38it/s]

✅ 총 2222개의 청크가 ChromaDB에 성공적으로 저장되었습니다.





In [115]:
# ChromaDB 컬렉션 로드
collection = client.get_collection(name=collection_name)

# 임베딩 모델 로드 (저장 시 사용한 모델과 동일해야 함)
model = SentenceTransformer("snunlp/KR-SBERT-V40K-klueNLI-augSTS")

In [116]:
import torch
from openai import OpenAI

# OpenAI API 키 설정 (또는 다른 LLM API 키)
openai_api_key ='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'
client_llm = OpenAI(api_key=openai_api_key)
print("✅ OpenAI LLM 클라이언트 초기화 완료")

✅ OpenAI LLM 클라이언트 초기화 완료


In [117]:
def retrieval_augmented_generation(query: str):
    """
    사용자의 질문에 RAG 방식으로 답변하는 함수입니다.
    
    Args:
        query (str): 사용자의 질문
    
    Returns:
        str: LLM이 생성한 답변
    """
    print("1. 검색(Retrieval) 시작...")
    # 1.1. 질문을 임베딩
    query_embedding = model.encode([query]).tolist()
    
    # 1.2. ChromaDB에서 가장 유사한 청크 검색
    # n_results를 늘려 더 많은 문서를 가져올 수 있습니다.
    results = collection.query(
        query_embeddings=query_embedding,
        n_results=10,  # 상위 10개 결과를 가져옴
    )
    
    # 1.3. 검색된 문서(청크) 추출
    retrieved_chunks = results['documents'][0]
    retrieved_metas = results['metadatas'][0]
    print(f"✅ 검색 완료: {len(retrieved_chunks)}개의 관련 문서 발견")
    
    print("2. 보강(Augmentation) 시작...")
    # 2.1. 검색된 청크를 프롬프트에 포함
    context = ""
    for doc, meta in zip(retrieved_chunks, retrieved_metas):
        file_name = meta.get("source_file", "unknown")
        context += f"[출처: {file_name}]\n{doc}\n\n"
    
    # 2.2. 시스템 프롬프트 구성
    system_prompt = f"""
    당신은 주어진 문서를 기반으로 질문에 답변하는 AI 비서입니다.
    문서 내용을 요약하고, 반드시 문서에서 제공하는 정보만 바탕으로 답변하세요.
    문서에 관련된 내용이 확실히 없는 경우에만 '문서에 관련 정보가 없습니다.'라고 답변하세요.

    [검색된 문서]
    {context}
    """
    
    print("3. 생성(Generation) 시작...")
    # 3.1. LLM에 질문 전달 및 답변 생성
    response = client_llm.chat.completions.create(
        model="gpt-4.1-mini",  # 또는 "gpt-3.5-turbo" 등
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": query}
        ],
        temperature=0.7,
        max_tokens=700
    )
    
    return response.choices[0].message.content

In [118]:
# --- 사용 예시 ---
user_query = "대전대학교의 다층적 융합 학습경험 플랫폼에 대해 자세히 알려줘."
answer = retrieval_augmented_generation(user_query)

print("\n--- 질문 ---")
print(user_query)
print("\n--- 답변 ---")
print(answer)

1. 검색(Retrieval) 시작...
✅ 검색 완료: 10개의 관련 문서 발견
2. 보강(Augmentation) 시작...
3. 생성(Generation) 시작...

--- 질문 ---
대전대학교의 다층적 융합 학습경험 플랫폼에 대해 자세히 알려줘.

--- 답변 ---
대전대학교의 다층적 융합 학습경험 플랫폼(MILE)은 융합 교육 혁신과 학생 맞춤형 학습 지원을 목표로 구축되는 시스템입니다. 주요 내용은 다음과 같습니다.

1. 사업 목표  
- 융합 교육 학습 성과를 실시간으로 추적하고 피드백을 강화  
- 경계 없는 학생 맞춤형 학습경로 제공을 통한 융합 교육 확산  
- 유연한 학사 운영과 융합 교육 프로그램 지원  

2. 시스템 및 사업 범위  
- ‘나노 마이크로디그리 전산시스템’과 ‘적응형 교수·학습 시스템’과 연계  
- 교수 및 융합교육 담당 코디네이터와 학생이 활용할 수 있는 융합 교육 모니터링 시스템 신규 구축  
- 나노·마이크로디그리 출력 화면 조정 및 교수 융합교육 역량 진단 기능 포함  
- 학생 중심 교육 기반인 ‘ACT 교과인증 확대’를 대비한 학사 강좌개설 연계 보완 및 추가 개선  

3. 필요성 및 운영 방향  
- 학생들이 자율적으로 학습 경로를 선택할 수 있도록 개방적 융합 교육 운영과 성과 관리 체계 마련  
- 무경계 교육과정 참여 학생 지원 및 융합 교육 활성화를 위한 교수-학생 활용 플랫폼 구축  
- 전주기적 학생 지원을 위한 제반 환경 조성  

요약하면, MILE 플랫폼은 대전대학교가 융합 교육을 체계적이고 실시간으로 지원하며 학생 개개인 맞춤형 학습을 가능하게 하는 다층적 통합 학습관리 시스템으로, 기존의 나노·마이크로디그리 시스템과 적응형 학습 시스템을 연계 보완하여 융합 교육의 혁신과 확산을 도모하는 사업입니다.


In [99]:
import os
file_path = "../data/processed/datapreprocessingbjs(pdfplumber)/all_pdf_chunks.json"
print(os.path.getsize(file_path) / 1024, "KB")

4901.6904296875 KB


In [100]:
with open(file_path, "r", encoding="utf-8") as f:
    for i in range(5):  # 처음 5줄만
        print(f.readline())

[

    {

        "type": "table",

        "subtype": "general",

        "content": "문서번호: 개정번호, -: 0. 문서번호: 발 행 일, -: 2024. 10. 30",



In [105]:
import json

with open(file_path, "r", encoding="utf-8") as f:
    data = json.load(f)

print("총 개수:", len(data))
print("data 타입:", type(data))

총 개수: 2222
data 타입: <class 'list'>


In [106]:
if isinstance(data, list):
    print("첫 번째 항목:", data[0].keys())
elif isinstance(data, dict):
    print("keys:", list(data.keys()))

첫 번째 항목: dict_keys(['type', 'subtype', 'content', 'original_rows', 'metadata', 'chunk_id'])


In [102]:
# 첫 3개 확인
for i, chunk in enumerate(data[:3], 1):
    print(f"\n=== Chunk {i} ===")
    for k, v in chunk.items():
        if isinstance(v, str) and len(v) > 100:
            print(f"{k}: {v[:100]}...")  # 너무 긴 문자열은 앞부분만
        else:
            print(f"{k}: {v}")


=== Chunk 1 ===
type: table
subtype: general
content: 문서번호: 개정번호, -: 0. 문서번호: 발 행 일, -: 2024. 10. 30
original_rows: 3
metadata: {'table_id': 'table_1_0', 'source': 'pdf_table_extraction', 'priority': 4, 'source_file': '기초과학연구원_2025년도 중이온가속기용 극저온시스템 운전 용역.pdf'}
chunk_id: chunk_0000

=== Chunk 2 ===
type: table
subtype: general
content: 문서상태 식별 (다음 중 하나에 체크): □ 문서상태-1 : 최종설계, 해석, 구매 등에 이용되는 설계 확인된 정보. 문서상태 식별 (다음 중 하나에 체크): □ 문서상태-2 : ...
original_rows: 4
metadata: {'table_id': 'table_1_1', 'source': 'pdf_table_extraction', 'priority': 4, 'source_file': '기초과학연구원_2025년도 중이온가속기용 극저온시스템 운전 용역.pdf'}
chunk_id: chunk_0001

=== Chunk 3 ===
type: table
subtype: general
content: : 작 성, 소속 및 직위: 극저온팀/연구위원, 성 명: 유정현. : 검 토, 소속 및 직위: 품질관리실/연구위원, 성 명: 신일경. 소속 및 직위: 극저온팀/팀장, 성 명: 신재...
original_rows: 5
metadata: {'table_id': 'table_1_2', 'source': 'pdf_table_extraction', 'priority': 4, 'source_file': '기초과학연구원_2025년도 중이온가속기용 극저온시스템 운전 용역.pdf'}
chunk_id: chunk_0002


In [46]:
import shutil

# 삭제할 폴더 경로를 지정하세요.
folder_path = 'my_chroma_db'
try:
    shutil.rmtree(folder_path)
    print(f"'{folder_path}' 폴더와 모든 내용이 성공적으로 삭제되었습니다.")
except OSError as e:
    print(f"오류: {e.strerror} - {e.filename}")

'my_chroma_db' 폴더와 모든 내용이 성공적으로 삭제되었습니다.
