1. 필요한 라이브러리 임포트

In [1]:
import pandas as pd
import numpy as np
import re
import os
import time
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import warnings
from tqdm.notebook import tqdm
from datetime import datetime, timedelta
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import nltk
from nltk.util import ngrams
import hashlib
import concurrent.futures
import traceback
import json
import gc
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment
from openpyxl.formatting.rule import CellIsRule
from openpyxl.utils import get_column_letter

# 경고 메시지 설정
warnings.filterwarnings('ignore')

# NLTK 필요 패키지 다운로드
nltk.download('punkt', quiet=True)

# 기본 경로 설정
BASE_PATH = Path(r"C:\Users\markcloud\Desktop\오준호\IBK")

# 디버깅 모드 활성화
DEBUG_MODE = True
DEBUG_FILE = "debug_log.txt"

# 오류 날짜 목록 - 이 날짜들은 검사에서 제외됩니다
ERROR_DATES = ['2024-11-30', '2020-12-31']

# 디버깅 로그 함수
def log_debug(message):
    if DEBUG_MODE:
        with open(DEBUG_FILE, "a", encoding="utf-8") as f:
            f.write(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {message}\n")
            
# 디버그 로그 파일 초기화
with open(DEBUG_FILE, "w", encoding="utf-8") as f:
    f.write(f"=== 디버그 로그 시작: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===\n")

2. 파일 이름 분석 및 사전 필터링 함수

In [2]:
def has_keywords_in_name(filename):
    """파일명에 준법감시 관련 키워드가 있는지 확인"""
    keywords = ['유효기간', '준법감시', '심의필', '만료일', '법무', '결재', '승인']
    return any(keyword in filename for keyword in keywords)

def extract_expiry_from_filename_improved(filename):
    """파일명에서 유효기간 정보를 추출 - 개선된 버전"""
    import re
    from datetime import datetime
    
    log_debug(f"파일명 분석 시작: {filename}")
    
    # IBK 파일명 패턴에 맞는 정규식
    ibk_patterns = [
        r'유효기간\(([0-9]{4})[\.\\-]([0-9]{2})[\.\\-]([0-9]{2})\)',  # 유효기간(2025.08.20)
        r'유효기간\(([0-9]{4})([0-9]{2})([0-9]{2})\)',  # 유효기간(20250820)
        r'유효기간\(([0-9]{4})[-/.]([0-9]{1,2})[-/.]([0-9]{1,2})\)',  # 유효기간(2025-08-20)
        r'제[0-9]{4}-[0-9]+호\([^)]+\)\s*유효기간\((\d{4})[./-](\d{1,2})[./-](\d{1,2})\)',  # 제2024-4806호(날짜) 유효기간(2025.08.20)
        r'유효기간[_\s]([0-9]{4})[./-]([0-9]{1,2})[./-]([0-9]{1,2})',  # 유효기간_2025.08.20
        r'([0-9]{4})[./-]([0-9]{1,2})[./-]([0-9]{1,2})[_\s]유효기간',  # 2025.08.20_유효기간
        r'만료일[_\s]([0-9]{4})[./-]([0-9]{1,2})[./-]([0-9]{1,2})'   # 만료일_2025.08.20
    ]
    
    # 패턴 검색
    for i, pattern in enumerate(ibk_patterns):
        match = re.search(pattern, filename)
        if match:
            try:
                # 패턴에 따라 그룹 인덱스 조정
                if '제' in pattern and len(match.groups()) >= 3:  # 제2024-4806호 형식
                    year, month, day = map(int, match.groups()[-3:])
                else:  # 일반 형식
                    year, month, day = map(int, match.groups())
                    
                if 2000 <= year <= 2100 and 1 <= month <= 12 and 1 <= day <= 31:
                    expiry_date = datetime(year, month, day)
                    log_debug(f"유효기간 추출 성공: {expiry_date.strftime('%Y-%m-%d')} (패턴 {i+1})")
                    
                    # 오류 날짜 필터링
                    date_str = expiry_date.strftime('%Y-%m-%d')
                    if date_str in ERROR_DATES:
                        log_debug(f"오류 날짜 감지: {date_str}")
                        continue
                        
                    return expiry_date
            except (ValueError, IndexError) as e:
                log_debug(f"패턴 {i+1} 매칭 오류: {str(e)}")
                continue
    
    # 확장 패턴 시도 (날짜 형식이 다른 경우)
    extended_patterns = [
        r'유효기간[:\s]*([12]\d{3})[년\.\-]?([01]?\d)[월\.\-]?([0-3]?\d)[일]?',  # 유효기간: 2025년6월3일
        r'([12]\d{3})년?[^0-9]+([01]?\d)월?[^0-9]+([0-3]?\d)일?[^0-9]*유효',  # 2025년 6월 3일 유효
        r'유효[^0-9]*([12]\d{3})년?[^0-9]+([01]?\d)월?[^0-9]+([0-3]?\d)일?'   # 유효 2025년 6월 3일
    ]
    
    for i, pattern in enumerate(extended_patterns):
        match = re.search(pattern, filename)
        if match:
            try:
                groups = match.groups()
                log_debug(f"확장 패턴 {i+1} 매치: {pattern} -> {groups}")
                
                year, month, day = map(int, groups)
                if 2000 <= year <= 2100 and 1 <= month <= 12 and 1 <= day <= 31:
                    expiry_date = datetime(year, month, day)
                    date_str = expiry_date.strftime('%Y-%m-%d')
                    
                    if date_str in ERROR_DATES:
                        log_debug(f"오류 날짜 감지: {date_str}")
                        continue
                        
                    log_debug(f"확장 패턴으로 유효기간 추출 성공: {date_str}")
                    return expiry_date
            except (ValueError, IndexError) as e:
                log_debug(f"확장 패턴 {i+1} 매칭 오류: {str(e)}")
                continue
    
    log_debug(f"파일명에서 유효기간 추출 실패: {filename}")
    return None

def extract_info_from_filename(filename):
    """파일명에서 준법감시 정보 추출 (준법감시필 번호 + 유효기간)"""
    # 유효기간 추출
    expiry_date = extract_expiry_from_filename_improved(filename)
    
    # 준법감시필 번호 추출
    compliance_number = extract_compliance_number(filename)
    
    # 유효기간 상태 계산
    if expiry_date:
        today = datetime.now().date()
        expiry_date_obj = expiry_date.date()
        days_to_expiry = (expiry_date_obj - today).days
        
        if days_to_expiry < 0:
            status = '만료됨'
        elif days_to_expiry <= 30:
            status = '30일 이내 만료'
        else:
            status = '유효함'
    else:
        days_to_expiry = None
        status = '상태 불명'
    
    return {
        'filename': filename,
        'expiry_date': expiry_date.strftime('%Y-%m-%d') if expiry_date else None,
        'compliance_number': compliance_number,
        'days_to_expiry': days_to_expiry,
        'status': status,
        'source': '파일명'
    }

def extract_compliance_number(filename):
    """파일명에서 준법감시필 번호 추출"""
    log_debug(f"준법감시필 번호 추출 시작: {filename}")
    
    compliance_patterns = [
        r'제(\d{4})-(\d+)호',  # 제2024-4806호
        r'준법감시[_\-\s]*(\d{4})[_\-\s]*(\d+)',  # 준법감시-2024-123
        r'심의필[_\-\s]*(\d{4})[_\-\s]*(\d+)',    # 심의필-2024-123
        r'준법[_\-\s]*(\d{4})[_\-\s]*(\d+)'       # 준법-2024-123
    ]
    
    for i, pattern in enumerate(compliance_patterns):
        match = re.search(pattern, filename)
        if match:
            try:
                year, number = match.groups()
                
                # 형식에 따라 준법감시필 번호 생성
                if pattern.startswith(r'제'):
                    compliance_number = f"제{year}-{number}호"
                elif pattern.startswith(r'준법감시'):
                    compliance_number = f"준법감시-{year}-{number}"
                elif pattern.startswith(r'심의필'):
                    compliance_number = f"심의필-{year}-{number}"
                else:
                    compliance_number = f"준법-{year}-{number}"
                
                log_debug(f"준법감시필 번호 추출 성공: {compliance_number} (패턴 {i+1})")
                return compliance_number
            except Exception as e:
                log_debug(f"준법감시필 번호 추출 오류: {str(e)}")
                continue
    
    log_debug(f"파일명에서 준법감시필 번호 추출 실패: {filename}")
    return None

3. 문서 로드 및 전처리 함수

In [3]:
def read_file_content(file_path):
    """다양한 파일 형식에서 텍스트 추출 - 성능 개선"""
    content = ""
    ext = file_path.suffix.lower()
    
    try:
        # 1. 파일 크기 확인 - 대용량 파일은 제한적으로 처리
        file_size_mb = file_path.stat().st_size / (1024 * 1024)
        if file_size_mb > 200:  # 200MB 이상은 최소한만 처리
            log_debug(f"대용량 파일 감지: {file_path.name} ({file_size_mb:.2f}MB) - 제한적 처리")
            
            # 텍스트 파일일 경우 앞부분만 읽기
            if ext == '.txt':
                with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
                    return f.read(10000)  # 처음 1만자만 읽기
            
            # 다른 대용량 파일은 건너뛰기
            return f"[대용량 파일: {file_path.name} - 건너뜀]"
        
        # Word 문서
        elif ext == '.docx':
            try:
                import docx
                doc = docx.Document(file_path)
                paragraphs = doc.paragraphs[:100]  # 최대 100개 문단만 처리
                content = "\n".join([p.text for p in paragraphs if p.text])
            except:
                pass
        
        # Excel 파일
        elif ext in ['.xlsx', '.xls']:
            try:
                xl = pd.ExcelFile(file_path, engine='openpyxl')
                sheet_names = xl.sheet_names[:3]  # 최대 3개 시트만 처리
                
                all_text = []
                for sheet_name in sheet_names:
                    try:
                        sheet_df = pd.read_excel(file_path, sheet_name=sheet_name, engine='openpyxl', nrows=1000)
                        # 텍스트 열만 추출
                        text_cols = [col for col in sheet_df.columns if sheet_df[col].dtype == 'object']
                        if text_cols:
                            sheet_text = '\n'.join(sheet_df[text_cols].fillna('').astype(str).apply(' '.join, axis=1).tolist())
                            all_text.append(sheet_text)
                    except:
                        continue
                
                content = '\n'.join(all_text)
            except:
                pass
        
        # PowerPoint 파일
        elif ext == '.pptx':
            try:
                from pptx import Presentation
                prs = Presentation(file_path)
                slide_texts = []
                for slide in list(prs.slides)[:20]:  # 최대 20개 슬라이드
                    for shape in slide.shapes:
                        if hasattr(shape, "text"):
                            slide_texts.append(shape.text)
                content = "\n".join(slide_texts)
            except:
                pass
        
        # HWP 파일
        elif ext == '.hwp':
            try:
                import olefile
                if olefile.isOleFile(str(file_path)):
                    with olefile.OleFile(str(file_path)) as ole:
                        if ole.exists('PrvText'):
                            prv_text = ole.openstream('PrvText')
                            content = prv_text.read().decode('utf-16-le', errors='replace')
                            prv_text.close()
            except:
                pass
                
        # PDF 파일
        elif ext == '.pdf':
            try:
                import PyPDF2
                with open(file_path, 'rb') as file:
                    # 더 엄격한 오류 처리 추가
                    try:
                        reader = PyPDF2.PdfReader(file, strict=False)
                        
                        # 최대 5페이지만 처리 (빠른 처리를 위해)
                        max_pages = min(5, len(reader.pages))
                        for page_num in range(max_pages):
                            try:
                                page_text = reader.pages[page_num].extract_text()
                                if page_text:
                                    content += page_text + "\n"
                            except Exception as e:
                                # 페이지별 오류 무시하고 계속 진행
                                continue
                    except Exception as e:
                        # PDF 처리 오류 무시
                        pass
            except:
                pass
        
    except Exception as e:
        log_debug(f"파일 {file_path.name} 읽기 오류: {str(e)}")
        return ""

def process_hwp_safely(file_path):
    """HWP 파일을 안전하게 처리하는 함수"""
    content = ""
    
    # 방법 1: olefile 사용
    try:
        import olefile
        if olefile.isOleFile(str(file_path)):
            try:
                ole = olefile.OleFile(str(file_path))
                try:
                    if ole.exists('PrvText'):
                        prv_text = ole.openstream('PrvText')
                        content = prv_text.read().decode('utf-16-le', errors='replace')
                        prv_text.close()
                    else:
                        content = f"[HWP 파일: {file_path.name} - 미리보기 없음]"
                finally:
                    ole.close()
                if content:
                    return content
            except Exception as e:
                pass
    except:
        pass
    
    # 방법 2: hwp.txt 존재 확인
    try:
        txt_path = file_path.with_suffix('.txt')
        if txt_path.exists():
            with open(txt_path, 'r', encoding='utf-8', errors='replace') as f:
                content = f.read()
                if content:
                    return content
    except:
        pass
    
    # 방법 3: 파일명에서 메타데이터 추출
    filename = file_path.name
    expiry_date = extract_expiry_from_filename_improved(filename)
    
    if expiry_date:
        content = f"[파일명 메타데이터에서 추출] 유효기간: {expiry_date.strftime('%Y년 %m월 %d일')}"
        return content
    
    # 모든 방법 실패 시 기본 메시지
    return f"[HWP 파일: {file_path.name} - 컨텐츠 추출 실패]"

def collect_target_files(base_path):
    """타겟 파일 수집 - 병렬 처리 최적화"""
    print("분석할 파일 찾는 중...")
    file_extensions = ['.docx', '.xlsx', '.pptx', '.hwp', '.txt', '.pdf']
    
    target_files = []
    keywords = ['준법', '감시', '심의', '유효기간', '만료']
    
    # 디렉토리 목록 생성
    all_dirs = []
    try:
        for d in Path(base_path).iterdir():
            if d.is_dir():
                all_dirs.append(d)
    except Exception as e:
        print(f"디렉토리 목록 생성 중 오류: {e}")
    
    if not all_dirs:  # 하위 디렉토리가 없으면 현재 디렉토리 검색
        all_dirs = [Path(base_path)]
    
    print(f"총 {len(all_dirs)}개 디렉토리 검색 예정")
    
    # 병렬 파일 검색 - 간소화 버전
    def search_files_in_directory(directory):
        found_files = []
        for ext in file_extensions:
            try:
                for file_path in Path(directory).glob(f'**/*{ext}'):
                    if any(keyword in file_path.name for keyword in keywords):
                        found_files.append(file_path)
            except Exception as e:
                print(f"디렉토리 {directory} 검색 중 오류: {e}")
        return found_files
    
    # 파일 검색 (병렬 처리)
    target_files = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
        search_results = list(executor.map(search_files_in_directory, all_dirs))
        for result in search_results:
            target_files.extend(result)
    
    print(f"총 {len(target_files)}개 파일을 찾았습니다.")
    return target_files

4. 준법감시 정보 추출 클래스 정의

In [4]:
class EnhancedComplianceExtractor:
    def __init__(self, api_key=None, use_ngram=True, use_ai=False):
        self.api_key = api_key
        self.use_ngram = use_ngram
        self.use_ai = use_ai
        self.chunk_size = 1000  # 최적 청크 크기 (GPT API 토큰 한도 고려)
        self.chunk_overlap = 200  # 맥락 유지를 위한 중첩 크기
        
        # 키워드 및 패턴 정의
        self.compliance_keywords = [
            '준법감시', '준법감시인', '준법감사', '심의필', '제호', 
            '승인', '결재', '법규', '컴플라이언스'
        ]
        
        self.validity_keywords = [
            '유효기간', '만료일', '유효', '만료', '기간', 
            '까지', '효력', '사용기한'
        ]
        
    def create_semantic_chunks(self, text):
        """의미론적 청크 생성 - 문장/단락 단위 분할"""
        if not text or len(text) < self.chunk_size:
            return [text] if text else []
        
        # 단락 기반 분할 (더 의미있는 청크)
        paragraphs = re.split(r'\n\s*\n', text)
        
        chunks = []
        current_chunk = ""
        
        for para in paragraphs:
            # 단일 단락이 청크 크기보다 크면 문장으로 분할
            if len(para) > self.chunk_size:
                sentences = re.split(r'(?<=[.!?])\s+', para)
                for sentence in sentences:
                    if len(current_chunk) + len(sentence) <= self.chunk_size:
                        current_chunk += sentence + " "
                    else:
                        chunks.append(current_chunk.strip())
                        current_chunk = sentence + " "
            # 청크 크기 이내면 단락 단위로 추가
            elif len(current_chunk) + len(para) <= self.chunk_size:
                current_chunk += para + "\n\n"
            else:
                chunks.append(current_chunk.strip())
                current_chunk = para + "\n\n"
        
        # 마지막 청크 추가
        if current_chunk.strip():
            chunks.append(current_chunk.strip())
        
        # 청크 중첩 처리로 맥락 유지
        overlapped_chunks = []
        for i in range(len(chunks)):
            if i < len(chunks) - 1:
                # 현재 청크와 다음 청크의 일부를 중첩
                overlap_size = min(self.chunk_overlap, len(chunks[i]), len(chunks[i+1]))
                
                if i == 0:
                    overlapped_chunks.append(chunks[i])
                
                # 중첩 청크: 현재 청크의 끝 + 다음 청크의 시작
                current_end = chunks[i][-overlap_size:] if overlap_size > 0 else ""
                next_start = chunks[i+1][:overlap_size] if overlap_size > 0 else ""
                
                overlapped_chunks.append(current_end + next_start)
            
            if i == len(chunks) - 1:
                overlapped_chunks.append(chunks[i])
        
        return overlapped_chunks if overlapped_chunks else chunks
        
    def score_chunks_by_relevance(self, chunks):
        """준법감시 관련성 기준으로 청크 점수화 및 필터링"""
        scored_chunks = []
        
        for i, chunk in enumerate(chunks):
            # 키워드 기반 점수
            keyword_score = 0
            for keyword in self.compliance_keywords:
                keyword_score += chunk.lower().count(keyword.lower()) * 5
            
            for keyword in self.validity_keywords:
                keyword_score += chunk.lower().count(keyword.lower()) * 5
            
            # 정규식 패턴 기반 점수
            pattern_score = 0
            compliance_patterns = [
                r'제\d{4}-\d+호',  # 제2024-4806호
                r'준법감시[_\-\s]*\d{4}[_\-\s]*\d+',  # 준법감시-2024-123
                r'심의필[_\-\s]*\d{4}[_\-\s]*\d+',    # 심의필-2024-123
            ]
            
            date_patterns = [
                r'유효기간[^0-9]*\d{4}[-/.년\s]\d{1,2}[-/.월\s]\d{1,2}일?',
                r'만료일[^0-9]*\d{4}[-/.년\s]\d{1,2}[-/.월\s]\d{1,2}일?'
            ]
            
            for pattern in compliance_patterns + date_patterns:
                if re.search(pattern, chunk):
                    pattern_score += 20
            
            # 종합 점수 및 저장
            total_score = keyword_score + pattern_score
            if total_score > 0:  # 관련성 있는 청크만 저장
                scored_chunks.append((i, chunk, total_score))
        
        # 점수 기준 정렬 및 상위 청크 선택
        scored_chunks.sort(key=lambda x: x[2], reverse=True)
        
        # 상위 청크 반환 (최대 5개, 또는 점수 30 이상인 모든 청크)
        top_chunks = [(idx, chunk) for idx, chunk, score in scored_chunks if score >= 30]
        if len(top_chunks) > 5:
            top_chunks = top_chunks[:5]
        
        return top_chunks
    
    def call_gpt_api(self, chunk):
        """GPT API 호출하여 준법감시 정보 추출"""
        if not self.use_ai or not self.api_key:
            return None
        
        try:
            import openai
            
            # API 키 설정
            openai.api_key = self.api_key
            
            # 프롬프트 작성
            prompt = f"""
            아래 텍스트에서 산업은행/기업은행(IBK) 문서의 준법감시필 번호와 유효기간 정보를 추출해주세요.
            
            준법감시필 번호는 다음 형식 중 하나일 수 있습니다:
            - 제2024-4806호
            - 준법감시-2024-123
            - 심의필-2024-456
            
            유효기간은 다음 형식 중 하나일 수 있습니다:
            - 유효기간: 2025.08.20
            - 만료일: 2025년 8월 20일
            - 2025.08.20까지 유효
            - 2025년 8월 20일까지 효력이 유지됩니다
            
            텍스트:
            {chunk[:1500]}  # 너무 긴 경우 앞부분만 전송
            
            JSON 형식으로 응답해주세요:
            {{
                "compliance_number": "추출된 준법감시필 번호(모르면 null)",
                "expiry_date": "추출된 유효기간을 YYYY-MM-DD 형식으로(모르면 null)",
                "confidence": 0.9, // 신뢰도 (0.0~1.0)
                "context": "정보가 발견된 문맥(한 문장)"
            }}
            """
            
            # GPT API 호출
            try:
                # ChatCompletion API 호출
                response = openai.ChatCompletion.create(
                    model="gpt-4", # 또는 "gpt-3.5-turbo"
                    messages=[
                        {"role": "system", "content": "당신은 금융 문서 분석 전문가입니다. 준법감시 관련 정보를 정확히 추출합니다."},
                        {"role": "user", "content": prompt}
                    ],
                    temperature=0.1  # 결정적인 응답 위해 temperature 낮게 설정
                )
                
                # 응답 추출
                ai_response = response.choices[0].message['content']
                
            except AttributeError:
                # 최신 OpenAI 패키지 버전(1.0.0 이상)에서는 다른 호출 방식 사용
                client = openai.OpenAI(api_key=self.api_key)
                response = client.chat.completions.create(
                    model="gpt-4", # 또는 "gpt-3.5-turbo"
                    messages=[
                        {"role": "system", "content": "당신은 금융 문서 분석 전문가입니다. 준법감시 관련 정보를 정확히 추출합니다."},
                        {"role": "user", "content": prompt}
                    ],
                    temperature=0.1
                )
                
                # 응답 추출
                ai_response = response.choices[0].message.content
            
            # JSON 파싱
            import json
            try:
                log_debug(f"GPT API 응답: {ai_response}")
                result = json.loads(ai_response)
                return result
            except json.JSONDecodeError:
                log_debug(f"JSON 파싱 오류: {ai_response}")
                
                # 간단한 응답 구문 분석 시도
                try:
                    # 형식이 벗어난 경우 핵심 정보만 추출 시도
                    compliance_pattern = r'"compliance_number":\s*"([^"]*)"'
                    expiry_pattern = r'"expiry_date":\s*"([^"]*)"'
                    confidence_pattern = r'"confidence":\s*([\d.]+)'
                    
                    compliance_match = re.search(compliance_pattern, ai_response)
                    expiry_match = re.search(expiry_pattern, ai_response)
                    confidence_match = re.search(confidence_pattern, ai_response)
                    
                    return {
                        "compliance_number": compliance_match.group(1) if compliance_match else None,
                        "expiry_date": expiry_match.group(1) if expiry_match else None,
                        "confidence": float(confidence_match.group(1)) if confidence_match else 0.5,
                        "context": None
                    }
                except:
                    return {
                        "compliance_number": None,
                        "expiry_date": None,
                        "confidence": 0.0,
                        "context": None
                    }
                    
        except Exception as e:
            log_debug(f"GPT API 호출 오류: {str(e)}")
            return {
                "compliance_number": None,
                "expiry_date": None,
                "confidence": 0.0,
                "context": None
            }
    
    def extract_ngram_features(self, text, n=3):
        """N-gram 특성 추출"""
        from nltk.util import ngrams
        from nltk.tokenize import word_tokenize
        
        # 토큰화
        tokens = word_tokenize(text.lower())
        
        # n-gram 생성
        n_grams = list(ngrams(tokens, n))
        return ['_'.join(gram) for gram in n_grams]
    
    def extract_with_regex(self, text):
        """정규식으로 준법감시 정보 추출 - 개선된 버전"""
        log_debug("정규식으로 준법감시 정보 추출 시작")
        result = {"compliance_number": None, "expiry_date": None}
        
        if not text or len(text) < 10:
            return result
        
        # 디버깅을 위한 로깅 강화
        log_debug(f"분석할 텍스트 길이: {len(text)} 자")
        if len(text) > 200:
            log_debug(f"텍스트 일부: {text[:200]}...")
        else:
            log_debug(f"텍스트 전체: {text}")
        
        # 1. 준법감시필 번호 추출 - 패턴 추가
        compliance_patterns = [
            r'제(\d{4})-(\d+)호',                         # 제2024-4806호
            r'준법감시[_\-\s]*(\d{4})[_\-\s]*(\d+)',      # 준법감시-2024-123
            r'심의필[_\-\s]*(\d{4})[_\-\s]*(\d+)',        # 심의필-2024-123
            r'준법[_\-\s]*(\d{4})[_\-\s]*(\d+)',          # 준법-2024-123
            r'(감사|감수|검수)[_\-\s]*(\d{4})[_\-\s]*(\d+)', # 감사-2024-123
            r'[^\d](\d{4})[_\-](\d{3,4})호',              # 2024-1234호
            r'[^\d](\d{4})[\s\-]*(\d{3,4})',              # 2024 1234 (앞뒤 문맥 확인 필요)
        ]
        
        for pattern in compliance_patterns:
            matches = re.finditer(pattern, text)
            for match in matches:
                try:
                    groups = match.groups()
                    
                    # 패턴에 맞는 준법감시필 번호 구성
                    if '제' in pattern:
                        result["compliance_number"] = f"제{groups[0]}-{groups[1]}호"
                    elif '준법감시' in pattern:
                        result["compliance_number"] = f"준법감시-{groups[0]}-{groups[1]}"
                    elif '심의필' in pattern:
                        result["compliance_number"] = f"심의필-{groups[0]}-{groups[1]}"
                    elif '감사' in pattern or '감수' in pattern or '검수' in pattern:
                        result["compliance_number"] = f"{groups[0]}-{groups[1]}-{groups[2]}"
                    else:
                        # 기본 형식
                        year, number = groups[0], groups[1]
                        # 문맥 기반 접두어 결정 (간단한 휴리스틱)
                        context_before = text[max(0, match.start()-20):match.start()]
                        if '준법' in context_before or '법규' in context_before:
                            result["compliance_number"] = f"준법-{year}-{number}"
                        elif '심의' in context_before:
                            result["compliance_number"] = f"심의필-{year}-{number}"
                        else:
                            result["compliance_number"] = f"문서번호-{year}-{number}"
                    
                    log_debug(f"정규식으로 준법감시필 번호 추출: {result['compliance_number']}")
                    break
                except:
                    continue
        
        # 2. 유효기간 추출 - 패턴 추가
        expiry_patterns = [
            r'유효기간[^0-9]*(\d{4})[-/.년\s](\d{1,2})[-/.월\s](\d{1,2})일?',  # 유효기간: 2025.08.20
            r'만료일[^0-9]*(\d{4})[-/.년\s](\d{1,2})[-/.월\s](\d{1,2})일?',   # 만료일: 2025.08.20
            r'(\d{4})[-/.년\s](\d{1,2})[-/.월\s](\d{1,2})일?[^0-9]*까지',     # 2025.08.20까지
            r'(\d{4})[-/.년\s](\d{1,2})[-/.월\s](\d{1,2})일?[^0-9]*만료',     # 2025.08.20 만료
            r'유효[^0-9]*(\d{4})[-/.년\s](\d{1,2})[-/.월\s](\d{1,2})일?',     # 유효: 2025.08.20
            r'효력기간[^0-9]*(\d{4})[-/.년\s](\d{1,2})[-/.월\s](\d{1,2})일?',  # 효력기간: 2025.08.20
            r'([~-]|부터)[^0-9]*(\d{4})[-/.년\s](\d{1,2})[-/.월\s](\d{1,2})일?', # ~2025.08.20
            r'기간[^\d]*(\d{4})[-/.년\s](\d{1,2})[-/.월\s](\d{1,2})일?',       # 기간: 2025.08.20
            r'사용기한[^\d]*(\d{4})[-/.년\s](\d{1,2})[-/.월\s](\d{1,2})일?',   # 사용기한: 2025.08.20
        ]
        
        for pattern in expiry_patterns:
            matches = re.finditer(pattern, text)
            for match in matches:
                try:
                    groups = match.groups()
                    # 연, 월, 일 값 가져오기 (패턴에 따라 다름)
                    if len(groups) >= 3:
                        year, month, day = int(groups[0]), int(groups[1]), int(groups[2])
                    else:
                        # 그룹이 예상과 다를 경우 계속
                        continue
                    
                    # 날짜 유효성 검사
                    if 2000 <= year <= 2100 and 1 <= month <= 12 and 1 <= day <= 31:
                        date_str = f"{year}-{month:02d}-{day:02d}"
                        
                        # 오류 날짜 필터링
                        if date_str in ERROR_DATES:
                            log_debug(f"오류 날짜 감지: {date_str}")
                            continue
                        
                        # 추가 검증: 과거 또는 먼 미래 날짜 검증
                        today = datetime.now().date()
                        date_obj = datetime(year, month, day).date()
                        days_diff = (date_obj - today).days
                        
                        # 5년 이상 과거나 5년 이상 미래는 의심
                        if abs(days_diff) > 365 * 5:
                            log_debug(f"의심스러운 날짜: {date_str} (차이: {days_diff}일)")
                            # 너무 과거나 먼 미래 날짜는 신뢰도 떨어짐 - 낮은 점수 부여
                            continue
                        
                        result["expiry_date"] = date_str
                        log_debug(f"정규식으로 유효기간 추출: {date_str}")
                        break
                except:
                    continue
                        
        return result
    
    def combine_results(self, regex_result, gpt_result, ngram_features):
        """여러 추출 결과 통합 - 가중치 기반"""
        # 초기화
        final_result = {
            "compliance_number": None,
            "expiry_date": None,
            "confidence": 0.0,
            "context": None,
            "ngram_result": {},
            "gpt_result": {}
        }
        
        # 정규식 결과 처리 (가중치: 0.6)
        if regex_result:
            final_result["compliance_number"] = regex_result.get("compliance_number")
            final_result["expiry_date"] = regex_result.get("expiry_date")
            final_result["confidence"] = 0.6
            final_result["ngram_result"] = regex_result
        
        # GPT 결과 처리 (가중치: 0.8) - GPT API가 활성화된 경우
        if gpt_result:
            gpt_confidence = gpt_result.get("confidence", 0.0) * 0.8
            
            # 신뢰도가 높은 경우에만 기존 결과 대체
            if gpt_confidence > final_result["confidence"]:
                final_result["compliance_number"] = gpt_result.get("compliance_number")
                final_result["expiry_date"] = gpt_result.get("expiry_date")
                final_result["confidence"] = gpt_confidence
                final_result["context"] = gpt_result.get("context")
            
            # 일치하는 경우 신뢰도 상승
            elif (final_result["compliance_number"] == gpt_result.get("compliance_number") or
                  final_result["expiry_date"] == gpt_result.get("expiry_date")):
                final_result["confidence"] += 0.2
                
            final_result["gpt_result"] = gpt_result
        
        # N-gram 결과 활용 (신뢰도 미세 조정)
        if ngram_features and (final_result["compliance_number"] or final_result["expiry_date"]):
            final_result["confidence"] = min(1.0, final_result["confidence"] + 0.1)
        
        return final_result
    
    def process_document(self, content):
        """문서 내용에서 준법감시필 번호와 유효기간 추출"""
        log_debug("문서 내용 분석 시작")
        
        if not content or not isinstance(content, str):
            return {"compliance_number": None, "expiry_date": None, "confidence": 0.0}
        
        # 1. 텍스트 청크 분할
        chunks = self.create_semantic_chunks(content)
        
        # 2. 관련성 높은 청크 선별
        relevant_chunks = self.score_chunks_by_relevance(chunks)
        
        # 청크가 없으면 전체 텍스트에 대해 간단 분석
        if not relevant_chunks and content:
            mini_chunk = content[:500]
            regex_result = self.extract_with_regex(mini_chunk)
            ai_result = self.call_gpt_api(mini_chunk) if self.use_ai else None
            ngram_features = self.extract_ngram_features(mini_chunk) if self.use_ngram else None
            return self.combine_results(regex_result, ai_result, ngram_features)
        
        # 청크별 결과 저장
        chunk_results = []
        
        # 3. 각 청크별 분석
        for idx, chunk in relevant_chunks:
            regex_result = self.extract_with_regex(chunk)
            ai_result = self.call_gpt_api(chunk) if self.use_ai else None
            ngram_features = self.extract_ngram_features(chunk) if self.use_ngram else None
            
            combined = self.combine_results(regex_result, ai_result, ngram_features)
            combined["chunk_index"] = idx
            chunk_results.append(combined)
        
        # 결과가 없으면 빈 결과 반환
        if not chunk_results:
            return {"compliance_number": None, "expiry_date": None, "confidence": 0.0}
        
        # 4. 최종 결과 선택 (신뢰도 기준)
        chunk_results.sort(key=lambda x: x.get("confidence", 0.0), reverse=True)
        return chunk_results[0]

5. 파일 분석 및 결과 처리 함수

In [5]:
def analyze_file(file_path):
    """파일 정보를 분석하여 결과 딕셔너리 반환 - 강화된 버전"""
    try:
        filename = file_path.name
        file_index = target_files.index(file_path) if file_path in target_files else -1
        
        # 파일 순번 로깅
        log_debug(f"{'='*50}")
        log_debug(f"파일 분석 시작 [{file_index+1}/{len(target_files)}]: {filename}")
        
        # 파일명에서 정보 추출
        filename_info = extract_info_from_filename(filename)
        
        # 파일 내용 확인 변수 초기화
        content_result = {
            "compliance_number": None,
            "expiry_date": None,
            "confidence": 0.0,
            "context": None
        }
        
        content = ""  # 파일 내용 저장 변수
        
        # 파일 확장자 확인
        ext = file_path.suffix.lower()
        
        # 파일 크기 확인
        try:
            file_size = file_path.stat().st_size / (1024 * 1024)  # MB 단위
            is_large_file = file_size > 10  # 10MB 초과
            
            if is_large_file:
                log_debug(f"대용량 파일 감지: {file_size:.2f}MB - 내용 분석 제한")
                content = read_file_content(file_path)[:20000]  # 처음 2만자만 분석
            else:
                content = read_file_content(file_path)
                
            # 내용이 있는 경우 분석
            if content:
                # AI 추출기 인스턴스 생성
                extractor = EnhancedComplianceExtractor(
                    api_key=None,  # 실제 API 키로 교체
                    use_ngram=True,
                    use_ai=False  # API 키가 없으면 AI 사용 안함
                )
                
                # 문서 처리
                content_result = extractor.process_document(content)
        except Exception as e:
            log_debug(f"파일 크기 확인 또는 내용 분석 오류: {str(e)}")
        
        # 결과 통합 - 파일 경로와 내용 미리보기 추가
        combined_result = combine_file_results(
            filename=filename,
            content_result=content_result,
            filename_result=filename_info,
            file_path=file_path,
            content_preview=content[:500] if content else None  # 최대 500자 미리보기
        )
        
        return combined_result
        
    except Exception as e:
        log_debug(f"파일 분석 중 오류: {str(e)}")
        log_debug(traceback.format_exc())
        # 오류 발생 시 기본 정보만 반환
        return {
            '파일명': file_path.name,
            '상태': '상태 불명',
            '만료일': '알 수 없음',
            '남은 일수': None,
            '파일명_준법감시필': None, 
            '파일명_유효기간': None,
            '파일명_유효기간_상태': '정보 없음',
            '파일명_남은일수': None,
            '파일내_준법감시필_번호': None,
            '파일내_유효기간_날짜': None,
            '파일내_유효기간_상태': '정보 없음',
            '파일내_남은일수': None,
            '파일경로': str(file_path.parent),
            '내용_미리보기': None,
            '파일인덱스': target_files.index(file_path) + 1 if file_path in target_files else -1
        }

def combine_file_results(filename, content_result, filename_result, file_path=None, content_preview=None):
    """파일명과 내용에서 추출한 결과 통합"""
    # 기본값 설정
    result = {
        '파일명': filename,
        '상태': '상태 불명',
        '만료일': '알 수 없음',
        '남은 일수': None,
        
        # 파일명 정보
        '파일명_준법감시필': filename_result.get('compliance_number'),
        '파일명_유효기간': filename_result.get('expiry_date'),
        '파일명_유효기간_상태': filename_result.get('status', '정보 없음'),
        '파일명_남은일수': filename_result.get('days_to_expiry'),
        
        # 파일 내용 정보
        '파일내_준법감시필_번호': content_result.get('compliance_number'),
        '파일내_유효기간_날짜': content_result.get('expiry_date'),
        '파일내_유효기간_상태': '정보 없음',
        '파일내_남은일수': None,
        
        '신뢰도': content_result.get('confidence', 0.0),
        '파일경로': str(file_path) if file_path else None,
        '내용_미리보기': content_preview[:200] + '...' if content_preview and len(content_preview) > 200 else content_preview
    }
    
    # 파일 내용에서 유효기간 상태 계산
    content_expiry = content_result.get('expiry_date')
    if content_expiry:
        try:
            today = datetime.now().date()
            content_date_obj = datetime.strptime(content_expiry, '%Y-%m-%d').date()
            content_days_left = (content_date_obj - today).days
            
            result['파일내_남은일수'] = content_days_left
            
            if content_days_left < 0:
                result['파일내_유효기간_상태'] = '만료됨'
            elif content_days_left <= 30:
                result['파일내_유효기간_상태'] = '30일 이내 만료'
            else:
                result['파일내_유효기간_상태'] = '유효함'
        except Exception as e:
            log_debug(f"내용 유효기간 상태 계산 오류: {e}")
    
    # 최종 만료일 및 상태 결정 (파일명 우선, 내용 차선)
    if filename_result.get('expiry_date'):
        result['만료일'] = filename_result.get('expiry_date')
        result['상태'] = filename_result.get('status')
        result['남은 일수'] = filename_result.get('days_to_expiry')
    elif content_expiry:
        result['만료일'] = content_expiry
        result['상태'] = result['파일내_유효기간_상태']
        result['남은 일수'] = result['파일내_남은일수']
    
    return result

def save_interim_results(results, batch_idx):
    """중간 결과 저장 함수"""
    try:
        interim_df = pd.DataFrame(results)
        interim_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        interim_file = f"interim_results_{batch_idx}_{interim_timestamp}.csv"
        interim_df.to_csv(interim_file, index=False, encoding='utf-8-sig')
        print(f"중간 결과 저장됨: {interim_file}")
        return interim_file
    except Exception as e:
        print(f"중간 결과 저장 중 오류: {e}")
        return None

def save_formatted_excel(df, excel_file):
    """결과를 서식이 적용된 Excel로 저장 - 최적화 버전"""
    try:
        # 대용량 데이터 처리를 위한 최적화 설정
        options = {'strings_to_urls': False, 'strings_to_formulas': False}
        
        with pd.ExcelWriter(excel_file, engine='openpyxl', options=options) as writer:
            # 기본 저장 (서식 없이)
            df.to_excel(writer, index=False, sheet_name='준법감시')
            
            # 워크북과 워크시트 가져오기
            workbook = writer.book
            worksheet = writer.sheets['준법감시']
            
            # 간소화된 서식 (속도 개선)
            # 헤더 행만 서식 적용
            header_font = Font(bold=True)
            header_fill = PatternFill(start_color='E6E6E6', end_color='E6E6E6', fill_type='solid')
            
            for cell in worksheet[1]:
                cell.font = header_font
                cell.fill = header_fill
            
            # 열 너비는 최소한으로만 설정
            for i, column in enumerate(df.columns):
                col_letter = get_column_letter(i + 1)
                worksheet.column_dimensions[col_letter].width = max(len(str(column)), 15)
            
        print(f"Excel 파일이 저장되었습니다: {excel_file}")
        return True
    except Exception as e:
        print(f"Excel 저장 오류: {e}")
        return False

6. 메인 실행 함수

In [6]:
def run_enhanced_compliance_analysis(directory_path):
    """IBK 준법감시 문서 향상된 분석 - 메인 실행 함수"""
    global target_files
    
    # 초기 설정
    start_time = time.time()
    print(f"현재 디렉토리: {os.getcwd()}")
    
    # 1. 파일 수집
    target_files = collect_target_files(directory_path)
    print(f"파일 분석 시작...")
    
    max_files = 1000  # 처리할 최대 파일 수
    if len(target_files) > max_files:
        target_files = target_files[:max_files]
        print(f"처리할 파일 수를 {max_files}개로 제한합니다.")

    # 2. 배치 처리 설정
    batch_size = 200  # 한 번에 처리할 파일 수
    total_batches = (len(target_files) + batch_size - 1) // batch_size
    print(f"총 {total_batches}개 배치로 나누어 처리합니다.")
    
    # 3. 배치별 처리 및 중간 결과 저장
    all_results = []
    for batch_idx in range(total_batches):
        batch_start = batch_idx * batch_size
        batch_end = min((batch_idx + 1) * batch_size, len(target_files))
        batch_files = target_files[batch_start:batch_end]
        
        print(f"\n배치 {batch_idx + 1}/{total_batches} 처리 중 ({batch_end - batch_start}개 파일)...")
        
        # 병렬 처리
        batch_results = []
        max_workers = min(os.cpu_count() or 2, 4)  # 최대 8개 워커
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            # 각 파일별로 분석 작업 예약
            future_to_file = {executor.submit(analyze_file, file_path): file_path for file_path in batch_files}
            
            # 완료된 작업 처리
            for future in tqdm(concurrent.futures.as_completed(future_to_file), total=len(batch_files), desc=f"배치 {batch_idx + 1} 분석 중"):
                try:
                    result = future.result()
                    if result:
                        batch_results.append(result)
                except Exception as e:
                    file_path = future_to_file[future]
                    print(f"\n파일 {file_path.name} 처리 중 오류: {e}")
        
        # 현재 배치 결과 저장
        all_results.extend(batch_results)
        print(f"배치 {batch_idx + 1} 완료: {len(batch_results)}개 파일 처리됨")
        
        # 메모리 정리
        gc.collect()
        
        # 중간 결과 저장
        if batch_idx % 2 == 1 or batch_idx == total_batches - 1:  # 2개 배치마다 또는 마지막 배치
            save_interim_results(all_results, batch_idx)
    
    # 4. 최종 결과 데이터프레임 생성
    print(f"총 {len(all_results)}개 파일 분석 완료")
    
    if all_results:
        df = pd.DataFrame(all_results)
        
        # 유효기간 날짜 검증 및 오류 수정
        def validate_date(date_str):
            """유효기간 날짜 검증 함수"""
            if not isinstance(date_str, str) or date_str == '알 수 없음':
                return date_str
                
            # 오류 날짜 명시적으로 제거
            if date_str in ERROR_DATES:
                return None
                
            try:
                # 날짜 형식 검증
                date_obj = datetime.strptime(date_str, '%Y-%m-%d')
                
                # 추가 유효성 검증 (너무 과거나 먼 미래의 날짜는 의심)
                today = datetime.now()
                five_years_ago = today.replace(year=today.year - 5)
                five_years_future = today.replace(year=today.year + 5)
                
                if date_obj < five_years_ago or date_obj > five_years_future:
                    # 너무 이상한 날짜는 로그로 기록
                    print(f"의심스러운 날짜 검출: {date_str} (허용 범위: {five_years_ago.strftime('%Y-%m-%d')} ~ {five_years_future.strftime('%Y-%m-%d')})")
                
                return date_str
            except:
                return None
        
        # 모든 날짜 컬럼에 대한 검증 적용
        for col in ['만료일', '파일명_유효기간', '파일내_유효기간_날짜']:
            if col in df.columns:
                df[col] = df[col].apply(validate_date)
        
        # 유효기간 날짜가 None인 경우 상태도 '상태 불명'으로 변경
        df.loc[df['만료일'].isna() | (df['만료일'] == '알 수 없음'), '상태'] = '상태 불명'
        
        # 결과 열 정리 - 필요한 컬럼만 선택
        column_order = [
            '파일명', '상태', '만료일', '남은 일수', 
            '파일명_준법감시필', '파일명_유효기간', '파일명_유효기간_상태', '파일명_남은일수',
            '파일내_준법감시필_번호', '파일내_유효기간_날짜', '파일내_유효기간_상태', '파일내_남은일수',
            '파일경로', '내용_미리보기'
        ]
        
        # 존재하는 열만 선택
        existing_columns = [col for col in column_order if col in df.columns]
        df = df[existing_columns]
        
        # 결과 정렬 (상태별)
        status_order = {'만료됨': 0, '30일 이내 만료': 1, '유효함': 2, '상태 불명': 3}
        df['status_order'] = df['상태'].map(lambda x: status_order.get(x, 4))
        df = df.sort_values(by=['status_order', '남은 일수'])
        df = df.drop(columns=['status_order'])
        
        # 5. 결과 저장
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        csv_file = f"ibk_준법감시_유효기간_{timestamp}.csv"
        excel_file = f"ibk_준법감시_유효기간_{timestamp}.xlsx"
        
        df.to_csv(csv_file, index=False, encoding='utf-8-sig')
        print(f"결과 파일이 현재 디렉토리에 저장되었습니다: {csv_file}")
        
        save_formatted_excel(df, excel_file)
        
        # 6. 결과 통계
        status_counts = df['상태'].value_counts().to_dict()
        print("\n=== 분석 결과 통계 ===")
        print(f"- 만료됨: {status_counts.get('만료됨', 0)}개")
        print(f"- 30일 이내 만료: {status_counts.get('30일 이내 만료', 0)}개")
        print(f"- 유효함: {status_counts.get('유효함', 0)}개")
        print(f"- 상태 불명: {status_counts.get('상태 불명', 0)}개")
        print(f"- 총 파일 수: {len(df)}개")
        
        # 정보 출처 통계
        source_stats = {
            '파일명에서만 발견': len(df[(df['파일명_유효기간'].notna()) & (df['파일내_유효기간_날짜'].isna())]),
            '파일 내용에서만 발견': len(df[(df['파일명_유효기간'].isna()) & (df['파일내_유효기간_날짜'].notna())]),
            '파일명과 내용 모두에서 발견': len(df[(df['파일명_유효기간'].notna()) & (df['파일내_유효기간_날짜'].notna())]),
            '어디에서도 발견되지 않음': len(df[(df['파일명_유효기간'].isna()) & (df['파일내_유효기간_날짜'].isna())])
        }
        
        print("\n=== 정보 출처 통계 ===")
        for source, count in source_stats.items():
            print(f"- {source}: {count}개 ({count/len(df)*100:.1f}%)")
        
        # 총 소요 시간
        total_time = time.time() - start_time
        print(f"\n총 실행 시간: {total_time:.2f}초 (파일당 평균: {total_time/len(target_files):.2f}초)")
        
        # 결과 미리보기
        print("\n=== 결과 데이터프레임 미리보기 ===")
        display(df.head())
        
        return df
    else:
        print("분석할 파일이 없거나 모든 파일 처리 중 오류가 발생했습니다.")
        return None

7. 실행 코드

In [None]:
# 실행 코드
if __name__ == "__main__":
    try:
        # 분석 실행
        result_df = run_enhanced_compliance_analysis(BASE_PATH)
    except Exception as e:
        print(f"실행 중 오류 발생: {e}")
        print(traceback.format_exc())

현재 디렉토리: c:\Users\markcloud\Desktop\오준호\IBK
분석할 파일 찾는 중...
총 7개 디렉토리 검색 예정
총 659개 파일을 찾았습니다.
파일 분석 시작...
총 4개 배치로 나누어 처리합니다.

배치 1/4 처리 중 (200개 파일)...


배치 1 분석 중:   0%|          | 0/200 [00:00<?, ?it/s]

incorrect startxref pointer(3)
incorrect startxref pointer(3)
incorrect startxref pointer(3)
incorrect startxref pointer(3)


배치 1 완료: 200개 파일 처리됨

배치 2/4 처리 중 (200개 파일)...


배치 2 분석 중:   0%|          | 0/200 [00:00<?, ?it/s]

배치 2 완료: 200개 파일 처리됨
중간 결과 저장됨: interim_results_1_20250516_150145.csv

배치 3/4 처리 중 (200개 파일)...


배치 3 분석 중:   0%|          | 0/200 [00:00<?, ?it/s]

incorrect startxref pointer(3)
incorrect startxref pointer(3)
incorrect startxref pointer(3)


배치 3 완료: 200개 파일 처리됨

배치 4/4 처리 중 (59개 파일)...


배치 4 분석 중:   0%|          | 0/59 [00:00<?, ?it/s]

incorrect startxref pointer(3)
incorrect startxref pointer(3)
incorrect startxref pointer(3)
